Skip to content

Commit

Permalink
tests: Allow configuring integration tests to use TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Sep 30, 2021
1 parent 7272a95 commit 6e04e8a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 20 deletions.
76 changes: 56 additions & 20 deletions tests/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ type ClusterConfig struct {
// UseBridge adds bridge between client and grpc server. Should be used in tests that
// want to manipulate connection or require connection not breaking despite server stop/restart.
UseBridge bool
// UseTCP configures server listen on tcp socket. If disabled unix socket is used.
UseTCP bool

EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration
Expand Down Expand Up @@ -216,7 +218,7 @@ func newCluster(t testutil.TB, cfg *ClusterConfig) *cluster {
c := &cluster{cfg: cfg}
ms := make([]*member, cfg.Size)
for i := 0; i < cfg.Size; i++ {
ms[i] = c.mustNewMember(t, int32(i))
ms[i] = c.mustNewMember(t, int64(i))
}
c.Members = ms
if err := c.fillClusterForMembers(); err != nil {
Expand Down Expand Up @@ -303,11 +305,11 @@ func (c *cluster) HTTPMembers() []client.Member {
return ms
}

func (c *cluster) mustNewMember(t testutil.TB, number int32) *member {
func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member {
m := mustNewMember(t,
memberConfig{
name: c.generateMemberName(),
memberNumber: number,
memberNumber: memberNumber,
authToken: c.cfg.AuthToken,
peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS,
Expand All @@ -323,6 +325,7 @@ func (c *cluster) mustNewMember(t testutil.TB, number int32) *member {
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
useIP: c.cfg.UseIP,
useBridge: c.cfg.UseBridge,
useTCP: c.cfg.UseTCP,
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
Expand All @@ -338,7 +341,7 @@ func (c *cluster) mustNewMember(t testutil.TB, number int32) *member {

// addMember return PeerURLs of the added member.
func (c *cluster) addMember(t testutil.TB) types.URLs {
m := c.mustNewMember(t,0)
m := c.mustNewMember(t, 0)

scheme := schemeFromTLSInfo(c.cfg.PeerTLS)

Expand Down Expand Up @@ -567,8 +570,8 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener {

type member struct {
config.ServerConfig
uniqNumber int32
memberNumber int32
UniqNumber int64
MemberNumber int64
PeerListeners, ClientListeners []net.Listener
grpcListener net.Listener
// PeerTLSInfo enables peer TLS when set
Expand All @@ -595,6 +598,7 @@ type member struct {
clientMaxCallRecvMsgSize int
useIP bool
useBridge bool
useTCP bool

isLearner bool
closed bool
Expand All @@ -604,7 +608,8 @@ func (m *member) GRPCURL() string { return m.grpcURL }

type memberConfig struct {
name string
memberNumber int32
uniqNumber int64
memberNumber int64
peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo
authToken string
Expand All @@ -620,6 +625,7 @@ type memberConfig struct {
clientMaxCallRecvMsgSize int
useIP bool
useBridge bool
useTCP bool
enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration
WatchProgressNotifyInterval time.Duration
Expand All @@ -630,8 +636,8 @@ type memberConfig struct {
func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
var err error
m := &member{
uniqNumber: atomic.AddInt32(&uniqueNumber, 1),
memberNumber: mcfg.memberNumber,
MemberNumber: mcfg.memberNumber,
UniqNumber: atomic.AddInt64(&localListenCount, 1),
}

peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
Expand Down Expand Up @@ -717,6 +723,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
m.useIP = mcfg.useIP
m.useBridge = mcfg.useBridge
m.useTCP = mcfg.useTCP
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval

Expand Down Expand Up @@ -749,13 +756,14 @@ func memberLogger(t testutil.TB, name string) *zap.Logger {
// listenGRPC starts a grpc server over a unix domain socket on the member
func (m *member) listenGRPC() error {
// prefix with localhost so cert has right domain
grpcAddr := m.grpcAddr()
network, host, port := m.grpcAddr()
grpcAddr := host + ":" + port
m.Logger.Info("LISTEN GRPC", zap.String("grpcAddr", grpcAddr), zap.String("m.Name", m.Name))
grpcListener, err := transport.NewUnixListener(grpcAddr)
grpcListener, err := net.Listen(network, grpcAddr)
if err != nil {
return fmt.Errorf("listen failed on grpc socket %s (%v)", grpcAddr, err)
}
m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + grpcAddr
m.grpcURL = fmt.Sprintf("%s://%s", m.clientScheme(), grpcAddr)
if m.useBridge {
_, err = m.addBridge()
if err != nil {
Expand All @@ -767,20 +775,36 @@ func (m *member) listenGRPC() error {
return nil
}

func (m *member) clientScheme() string {
switch {
case m.useTCP && m.ClientTLSInfo != nil:
return "https"
case m.useTCP && m.ClientTLSInfo == nil:
return "http"
case !m.useTCP && m.ClientTLSInfo != nil:
return "unixs"
case !m.useTCP && m.ClientTLSInfo == nil:
return "unix"
}
m.Logger.Panic("Failed to determine client schema")
return ""
}

func (m *member) addBridge() (*bridge, error) {
grpcAddr := m.grpcAddr()
network, host, port := m.grpcAddr()
grpcAddr := host + ":" + port
bridgeAddr := grpcAddr + "0"
m.Logger.Info("LISTEN BRIDGE", zap.String("grpc-address", bridgeAddr), zap.String("member", m.Name))
bridgeListener, err := transport.NewUnixListener(bridgeAddr)
if err != nil {
return nil, fmt.Errorf("listen failed on bridge socket %s (%v)", grpcAddr, err)
return nil, fmt.Errorf("listen failed on bridge socket %s (%v)", bridgeAddr, err)
}
m.grpcBridge, err = newBridge(dialer{network: "unix", addr: grpcAddr}, bridgeListener)
m.grpcBridge, err = newBridge(dialer{network: network, addr: grpcAddr}, bridgeListener)
if err != nil {
bridgeListener.Close()
return nil, err
}
m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr
m.grpcURL = m.clientScheme() + "://" + bridgeAddr
return m.grpcBridge, nil
}

Expand All @@ -791,13 +815,25 @@ func (m *member) Bridge() *bridge {
return m.grpcBridge
}

func (m *member) grpcAddr() string {
func (m *member) grpcAddr() (network, host, port string) {
// prefix with localhost so cert has right domain
host := "localhost"
host = "localhost"
if m.useIP { // for IP-only TLS certs
host = "127.0.0.1"
}
return fmt.Sprintf("%s:%d", host, baseGRPCPort + m.uniqNumber * 10 + m.memberNumber)
network = "unix"
if m.useTCP {
network = "tcp"
}
port = m.Name
if m.useTCP {
port = fmt.Sprintf("%d", GrpcPortNumber(m.UniqNumber, m.MemberNumber))
}
return network, host, port
}

func GrpcPortNumber(uniqNumber, memberNumber int64) int64 {
return baseGRPCPort + uniqNumber*10 + memberNumber
}

type dialer struct {
Expand Down Expand Up @@ -1573,7 +1609,7 @@ func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j],

// MustNewMember creates a new member instance based on the response of V3 Member Add API.
func (c *ClusterV3) MustNewMember(t testutil.TB, resp *clientv3.MemberAddResponse) *member {
m := c.mustNewMember(t,0)
m := c.mustNewMember(t, 0)
m.isLearner = resp.Member.IsLearner
m.NewCluster = false

Expand Down
3 changes: 3 additions & 0 deletions tests/integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/rand"
"os"
"reflect"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -1594,8 +1595,10 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) {

clus.Members[0].ClientTLSInfo = &testTLSInfo
clus.Members[0].DialOptions = []grpc.DialOption{grpc.WithBlock()}
clus.Members[0].grpcURL = strings.Replace(clus.Members[0].grpcURL, "http://", "https://", 1)
client, err := NewClientV3(clus.Members[0])
if client != nil || err == nil {
client.Close()
t.Fatalf("expected no client")
} else if err != context.DeadlineExceeded {
t.Fatalf("unexpected error (%v)", err)
Expand Down

0 comments on commit 6e04e8a

Please sign in to comment.