Skip to content

Commit

Permalink
clientv3/integration: add more tests on balancer switch, inflight range
Browse files Browse the repository at this point in the history
Test all possible cases of server shutdown with inflight range requests.
Removed redundant tests in kv_test.go.

Signed-off-by: Gyu-Ho Lee <[email protected]>
  • Loading branch information
gyuho committed Nov 8, 2017
1 parent efb0057 commit a858b0b
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 130 deletions.
135 changes: 5 additions & 130 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package integration
import (
"bytes"
"context"
"math/rand"
"os"
"reflect"
"strings"
Expand Down Expand Up @@ -715,11 +714,11 @@ func TestKVGetRetry(t *testing.T) {
}
}

// TestKVPutFailGetRetry ensures a get will retry following a failed put.
func TestKVPutFailGetRetry(t *testing.T) {
// TestKVPutFail ensures put fails on stopped node.
func TestKVPutFail(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

kv := clus.Client(0)
Expand All @@ -728,30 +727,8 @@ func TestKVPutFailGetRetry(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()
_, err := kv.Put(ctx, "foo", "bar")
if err == nil {
t.Fatalf("got success on disconnected put, wanted error")
}

donec := make(chan struct{})
go func() {
// Get will fail, but reconnect will trigger
gresp, gerr := kv.Get(context.TODO(), "foo")
if gerr != nil {
t.Fatal(gerr)
}
if len(gresp.Kvs) != 0 {
t.Fatalf("bad get kvs: got %+v, want empty", gresp.Kvs)
}
donec <- struct{}{}
}()

time.Sleep(100 * time.Millisecond)
clus.Members[0].Restart(t)

select {
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for get")
case <-donec:
if err != context.DeadlineExceeded {
t.Fatalf("expected %v, got %v", context.DeadlineExceeded, err)
}
}

Expand Down Expand Up @@ -825,82 +802,6 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
}
}

// TestKVGetOneEndpointDown ensures a client can connect and get if one endpoint is down.
func TestKVGetOneEndpointDown(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, SkipCreatingClient: true})
defer clus.Terminate(t)

// get endpoint list
eps := make([]string, 3)
for i := range eps {
eps[i] = clus.Members[i].GRPCAddr()
}

// make a dead node
clus.Members[rand.Intn(len(eps))].Stop(t)

// try to connect with dead node in the endpoint list
cfg := clientv3.Config{Endpoints: eps, DialTimeout: 1 * time.Second}
cli, err := clientv3.New(cfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil {
t.Fatal(err)
}
cancel()
}

// TestKVGetResetLoneEndpoint ensures that if an endpoint resets and all other
// endpoints are down, then it will reconnect.
func TestKVGetResetLoneEndpoint(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, SkipCreatingClient: true})
defer clus.Terminate(t)

// get endpoint list
eps := make([]string, 2)
for i := range eps {
eps[i] = clus.Members[i].GRPCAddr()
}

cfg := clientv3.Config{Endpoints: eps, DialTimeout: 500 * time.Millisecond}
cli, err := clientv3.New(cfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()

// disconnect everything
clus.Members[0].Stop(t)
clus.Members[1].Stop(t)

// have Get try to reconnect
donec := make(chan struct{})
go func() {
// 3-second is the minimum interval between endpoint being marked
// as unhealthy and being removed from unhealthy, so possibly
// takes >5-second to unpin and repin an endpoint
// TODO: decrease timeout when balancer switch rewrite
ctx, cancel := context.WithTimeout(context.TODO(), 7*time.Second)
if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil {
t.Fatal(err)
}
cancel()
close(donec)
}()
time.Sleep(500 * time.Millisecond)
clus.Members[0].Restart(t)
select {
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for Get")
case <-donec:
}
}

// TestKVPutAtMostOnce ensures that a Put will only occur at most once
// in the presence of network errors.
func TestKVPutAtMostOnce(t *testing.T) {
Expand Down Expand Up @@ -937,29 +838,3 @@ func TestKVPutAtMostOnce(t *testing.T) {
t.Fatalf("expected version <= 10, got %+v", resp.Kvs[0])
}
}

func TestKVSwitchUnavailable(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, SkipCreatingClient: true})
defer clus.Terminate(t)

clus.Members[0].InjectPartition(t, clus.Members[1:]...)
// try to connect with dead node in the endpoint list
cfg := clientv3.Config{
Endpoints: []string{
clus.Members[0].GRPCAddr(),
clus.Members[1].GRPCAddr(),
},
DialTimeout: 1 * time.Second}
cli, err := clientv3.New(cfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()
timeout := 3 * clus.Members[0].ServerConfig.ReqTimeout()
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
if _, err := cli.Get(ctx, "abc"); err != nil {
t.Fatal(err)
}
cancel()
}
116 changes: 116 additions & 0 deletions clientv3/integration/server_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,119 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl
t.Errorf("failed to finish range request in time %v (timeout %v)", err, timeout)
}
}

func TestBalancerUnderServerStopInflightLinearizableGetOnRestartPinLeaderStopPinFirst(t *testing.T) {
testBalancerUnderServerStopInflightRangeOnRestart(t, true, true, true)
}

func TestBalancerUnderServerStopInflightLinearizableGetOnRestartPinLeaderStopPinLater(t *testing.T) {
testBalancerUnderServerStopInflightRangeOnRestart(t, true, true, false)
}

func TestBalancerUnderServerStopInflightLinearizableGetOnRestartPinFollowerStopPinFirst(t *testing.T) {
testBalancerUnderServerStopInflightRangeOnRestart(t, true, false, true)
}

func TestBalancerUnderServerStopInflightLinearizableGetOnRestartPinFollowerStopPinLater(t *testing.T) {
testBalancerUnderServerStopInflightRangeOnRestart(t, true, false, false)
}

func TestBalancerUnderServerStopInflightSerializableGetOnRestartPinLeaderStopPinFirst(t *testing.T) {
testBalancerUnderServerStopInflightRangeOnRestart(t, false, true, true)
}

func TestBalancerUnderServerStopInflightSerializableGetOnRestartPinLeaderStopPinLater(t *testing.T) {
testBalancerUnderServerStopInflightRangeOnRestart(t, false, true, false)
}

func TestBalancerUnderServerStopInflightSerializableGetOnRestartPinFollowerStopPinFirst(t *testing.T) {
testBalancerUnderServerStopInflightRangeOnRestart(t, false, false, true)
}

func TestBalancerUnderServerStopInflightSerializableGetOnRestartPinFollowerStopPinLater(t *testing.T) {
testBalancerUnderServerStopInflightRangeOnRestart(t, false, false, false)
}

// testBalancerUnderServerStopInflightRangeOnRestart expects
// inflight range request reconnects on server restart.
func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizable, pinLeader, stopPinFirst bool) {
defer testutil.AfterTest(t)

cfg := &integration.ClusterConfig{
Size: 2,
SkipCreatingClient: true,
}
if linearizable {
cfg.Size = 3
}

clus := integration.NewClusterV3(t, cfg)
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}
if linearizable {
eps = append(eps, clus.Members[2].GRPCAddr())
}

lead := clus.WaitLeader(t)

target := lead
if !pinLeader {
target = (target + 1) % 2
}

// pin eps[target]
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}})
if err != nil {
t.Errorf("failed to create client: %v", err)
}
defer cli.Close()

// wait for eps[target] to be pinned
mustWaitPinReady(t, cli)

// add all eps to list, so that when the original pined one fails
// the client can switch to other available eps
cli.SetEndpoints(eps...)

if stopPinFirst {
clus.Members[target].Stop(t)
// give some time for balancer switch before stopping the other
time.Sleep(time.Second)
clus.Members[(target+1)%2].Stop(t)
} else {
clus.Members[(target+1)%2].Stop(t)
// balancer cannot pin other member since it's already stopped
clus.Members[target].Stop(t)
}

// 3-second is the minimum interval between endpoint being marked
// as unhealthy and being removed from unhealthy, so possibly
// takes >5-second to unpin and repin an endpoint
// TODO: decrease timeout when balancer switch rewrite
clientTimeout := 7 * time.Second

var clientOpts []clientv3.OpOption
if !linearizable {
clientOpts = append(clientOpts, clientv3.WithSerializable())
}

donec := make(chan struct{})
go func() {
defer close(donec)
ctx, cancel := context.WithTimeout(context.TODO(), clientTimeout)
_, err := cli.Get(ctx, "abc", clientOpts...)
cancel()
if err != nil {
t.Fatal(err)
}
}()

time.Sleep(500 * time.Millisecond)
clus.Members[target].Restart(t)

select {
case <-time.After(clientTimeout + 2*time.Second):
t.Fatalf("timed out waiting for Get")
case <-donec:
}
}

0 comments on commit a858b0b

Please sign in to comment.