diff --git a/clientv3/balancer.go b/clientv3/balancer.go index 6ae047e9841..32b42ec9f44 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -18,6 +18,7 @@ import ( "net/url" "strings" "sync" + "time" "golang.org/x/net/context" "google.golang.org/grpc" @@ -32,8 +33,10 @@ var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address avai // simpleBalancer does the bare minimum to expose multiple eps // to the grpc reconnection code path type simpleBalancer struct { - // addrs are the client's endpoints for grpc - addrs []grpc.Address + keepAlive bool + // addrs are the client's endpoints for grpc, + // mapped to connection activity status + addrs map[grpc.Address]addrConn // notifyCh notifies grpc of the set of addresses for connecting notifyCh chan []grpc.Address @@ -73,9 +76,9 @@ type simpleBalancer struct { func newSimpleBalancer(eps []string) *simpleBalancer { notifyCh := make(chan []grpc.Address, 1) - addrs := make([]grpc.Address, len(eps)) + addrs := make(map[grpc.Address]addrConn, len(eps)) for i := range eps { - addrs[i].Addr = getHost(eps[i]) + addrs[grpc.Address{Addr: getHost(eps[i])}] = addrConn{active: true, last: time.Now()} } sb := &simpleBalancer{ addrs: addrs, @@ -136,9 +139,9 @@ func (b *simpleBalancer) updateAddrs(eps []string) { b.host2ep = np - addrs := make([]grpc.Address, 0, len(eps)) + addrs := make(map[grpc.Address]addrConn, len(eps)) for i := range eps { - addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])}) + addrs[grpc.Address{Addr: getHost(eps[i])}] = addrConn{active: true, last: time.Now()} } b.addrs = addrs @@ -156,8 +159,8 @@ func (b *simpleBalancer) updateAddrs(eps []string) { } } -func hasAddr(addrs []grpc.Address, targetAddr string) bool { - for _, addr := range addrs { +func hasAddr(addrs map[grpc.Address]addrConn, targetAddr string) bool { + for addr := range addrs { if targetAddr == addr.Addr { return true } @@ -165,6 +168,29 @@ func hasAddr(addrs []grpc.Address, targetAddr string) bool { return false } +type addrConn struct { + active bool + last time.Time +} + +func setActive(addrs map[grpc.Address]addrConn, targetAddr string, active bool) (down bool) { + for addr, v := range addrs { + if targetAddr == addr.Addr { + // TODO: configure interval + if !v.active && time.Since(v.last) < time.Minute { + return false + } + ac := addrConn{active: active, last: v.last} + if active { + ac.last = time.Now() + } + addrs[addr] = ac + return true + } + } + return false +} + func (b *simpleBalancer) updateNotifyLoop() { defer close(b.donec) @@ -221,7 +247,14 @@ func (b *simpleBalancer) updateNotifyLoop() { func (b *simpleBalancer) notifyAddrs() { b.mu.RLock() - addrs := b.addrs + multi := len(b.addrs) > 1 // if single, retry the only endpoint + addrs := make([]grpc.Address, 0, len(b.addrs)) + for addr, ac := range b.addrs { + if b.keepAlive && multi && !ac.active { + continue + } + addrs = append(addrs, addr) + } b.mu.RUnlock() select { case b.notifyCh <- addrs: @@ -229,6 +262,9 @@ func (b *simpleBalancer) notifyAddrs() { } } +// Up is called by gRPC client after address connection state +// becomes connectivity.Ready. This is after HTTP/2 client +// establishes the transport. func (b *simpleBalancer) Up(addr grpc.Address) func(error) { b.mu.Lock() defer b.mu.Unlock() @@ -244,6 +280,11 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { if !hasAddr(b.addrs, addr.Addr) { return func(err error) {} } + if !setActive(b.addrs, addr.Addr, true) { // mark connectivity state as active + // it is possible that Up is called before gRPC receives Notify() + // and tears down keepalive timed-out endpoints + return func(err error) {} + } if b.pinAddr != "" { return func(err error) {} } @@ -255,8 +296,15 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { b.readyOnce.Do(func() { close(b.readyc) }) return func(err error) { b.mu.Lock() + if b.keepAlive && + (err.Error() == "grpc: failed with network I/O error" || + err.Error() == "grpc: the connection is drained") { + // set as connectivity.TransientFailure until next Up + // TODO: undo this when connection is up + setActive(b.addrs, addr.Addr, false) + } b.upc = make(chan struct{}) - close(b.downc) + close(b.downc) // trigger notifyAddrs b.pinAddr = "" b.mu.Unlock() } diff --git a/clientv3/client.go b/clientv3/client.go index 1f8c83f5750..66a3af54016 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -376,6 +376,10 @@ func newClient(cfg *Config) (*Client, error) { } client.balancer = newSimpleBalancer(cfg.Endpoints) + client.balancer.mu.Lock() + client.balancer.keepAlive = cfg.DialKeepAliveTime > 0 + client.balancer.mu.Unlock() + // use Endpoints[0] so that for https:// without any tls config given, then // grpc will assume the ServerName is in the endpoint. conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer)) diff --git a/clientv3/integration/watch_keepalive_test.go b/clientv3/integration/watch_keepalive_test.go new file mode 100644 index 00000000000..627dd78e984 --- /dev/null +++ b/clientv3/integration/watch_keepalive_test.go @@ -0,0 +1,72 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !cluster_proxy + +package integration + +import ( + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/pkg/testutil" + "golang.org/x/net/context" +) + +// TestWatchKeepAlive ensures that watch discovers it cannot talk to server +// and then switch to another endpoint with keep-alive parameters. +// TODO: test with '-tags cluster_proxy' +func TestWatchKeepAlive(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{ + Size: 3, + GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings + GRPCKeepAliveInterval: 5 * time.Second, // server-to-client ping + GRPCKeepAliveTimeout: time.Millisecond, + }) + defer clus.Terminate(t) + + ccfg := clientv3.Config{ + Endpoints: []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}, + DialKeepAliveTime: 5 * time.Second, + DialKeepAliveTimeout: time.Nanosecond, + } + cli, err := clientv3.New(ccfg) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + + wch := cli.Watch(context.Background(), "foo", clientv3.WithCreatedNotify()) + if _, ok := <-wch; !ok { + t.Fatalf("watch failed") + } + clus.Members[0].Blackhole() + + // expect 'cli' to switch endpoints from keepalive ping + // give enough time for slow machine + time.Sleep(ccfg.DialKeepAliveTime + 3*time.Second) + + if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil { + t.Fatal(err) + } + select { + case <-wch: + case <-time.After(3 * time.Second): + t.Fatal("took too long to receive events") + } +} diff --git a/embed/config.go b/embed/config.go index 8e40eb3601f..2b71535e0fb 100644 --- a/embed/config.go +++ b/embed/config.go @@ -22,6 +22,7 @@ import ( "net/url" "path/filepath" "strings" + "time" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/pkg/cors" @@ -92,6 +93,24 @@ type Config struct { MaxTxnOps uint `json:"max-txn-ops"` MaxRequestBytes uint `json:"max-request-bytes"` + // gRPC server options + + // GRPCKeepAliveMinTime is the minimum interval that a client should + // wait before pinging server. + // When client pings "too fast", server sends goaway and closes the + // connection (errors: too_many_pings, http2.ErrCodeEnhanceYourCalm). + // When too slow, nothing happens. + // Server expects client pings only when there is any active streams + // by setting 'PermitWithoutStream' false. + GRPCKeepAliveMinTime time.Duration `json:"grpc-keepalive-min-time"` + // GRPCKeepAliveInterval is the frequency of server-to-client ping + // to check if a connection is alive. Close a non-responsive connection + // after an additional duration of Timeout. + GRPCKeepAliveInterval time.Duration `json:"grpc-keepalive-interval"` + // GRPCKeepAliveTimeout is the additional duration of wait + // before closing a non-responsive connection. + GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"` + // clustering APUrls, ACUrls []url.URL @@ -175,25 +194,26 @@ func NewConfig() *Config { lcurl, _ := url.Parse(DefaultListenClientURLs) acurl, _ := url.Parse(DefaultAdvertiseClientURLs) cfg := &Config{ - CorsInfo: &cors.CORSInfo{}, - MaxSnapFiles: DefaultMaxSnapshots, - MaxWalFiles: DefaultMaxWALs, - Name: DefaultName, - SnapCount: etcdserver.DefaultSnapCount, - MaxTxnOps: DefaultMaxTxnOps, - MaxRequestBytes: DefaultMaxRequestBytes, - TickMs: 100, - ElectionMs: 1000, - LPUrls: []url.URL{*lpurl}, - LCUrls: []url.URL{*lcurl}, - APUrls: []url.URL{*apurl}, - ACUrls: []url.URL{*acurl}, - ClusterState: ClusterStateFlagNew, - InitialClusterToken: "etcd-cluster", - StrictReconfigCheck: true, - Metrics: "basic", - EnableV2: true, - AuthToken: "simple", + CorsInfo: &cors.CORSInfo{}, + MaxSnapFiles: DefaultMaxSnapshots, + MaxWalFiles: DefaultMaxWALs, + Name: DefaultName, + SnapCount: etcdserver.DefaultSnapCount, + MaxTxnOps: DefaultMaxTxnOps, + MaxRequestBytes: DefaultMaxRequestBytes, + GRPCKeepAliveMinTime: 5 * time.Second, + TickMs: 100, + ElectionMs: 1000, + LPUrls: []url.URL{*lpurl}, + LCUrls: []url.URL{*lcurl}, + APUrls: []url.URL{*apurl}, + ACUrls: []url.URL{*acurl}, + ClusterState: ClusterStateFlagNew, + InitialClusterToken: "etcd-cluster", + StrictReconfigCheck: true, + Metrics: "basic", + EnableV2: true, + AuthToken: "simple", } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) return cfg diff --git a/embed/etcd.go b/embed/etcd.go index e69adbfd63b..f31fd05dd4e 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -25,6 +25,9 @@ import ( "sync" "time" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/etcdhttp" "github.com/coreos/etcd/etcdserver/api/v2http" @@ -397,9 +400,24 @@ func (e *Etcd) serve() (err error) { } h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo}) + gopts := []grpc.ServerOption{} + if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) { + gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: e.cfg.GRPCKeepAliveMinTime, + PermitWithoutStream: false, + })) + } + if e.cfg.GRPCKeepAliveInterval > time.Duration(0) && + e.cfg.GRPCKeepAliveTimeout > time.Duration(0) { + gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: e.cfg.GRPCKeepAliveInterval, + Timeout: e.cfg.GRPCKeepAliveTimeout, + })) + } + for _, sctx := range e.sctxs { go func(s *serveCtx) { - e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler)) + e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...)) }(sctx) } diff --git a/embed/serve.go b/embed/serve.go index 3e9b37ea070..e5cfa6b7d20 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -66,7 +66,12 @@ func newServeCtx() *serveCtx { // serve accepts incoming connections on the listener l, // creating a new service goroutine for each. The service goroutines // read requests and then call handler to reply to them. -func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo, handler http.Handler, errHandler func(error)) error { +func (sctx *serveCtx) serve( + s *etcdserver.EtcdServer, + tlsinfo *transport.TLSInfo, + handler http.Handler, + errHandler func(error), + gopts ...grpc.ServerOption) error { logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) <-s.ReadyNotify() plog.Info("ready to serve client requests") @@ -77,7 +82,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo servLock := v3lock.NewLockServer(v3c) if sctx.insecure { - gs := v3rpc.Server(s, nil) + gs := v3rpc.Server(s, nil, gopts...) sctx.grpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) @@ -111,7 +116,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo if tlsErr != nil { return tlsErr } - gs := v3rpc.Server(s, tlscfg) + gs := v3rpc.Server(s, tlscfg, gopts...) sctx.grpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) diff --git a/etcdmain/config.go b/etcdmain/config.go index 4bc900bc1ed..f911d847016 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -24,6 +24,7 @@ import ( "os" "runtime" "strings" + "time" "github.com/coreos/etcd/embed" "github.com/coreos/etcd/pkg/flags" @@ -143,6 +144,9 @@ func newConfig() *config { fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.") fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum number of operations permitted in a transaction.") fs.UintVar(&cfg.MaxRequestBytes, "max-request-bytes", cfg.MaxRequestBytes, "Maximum client request size in bytes the server will accept.") + fs.DurationVar(&cfg.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.Config.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.") + fs.DurationVar(&cfg.GRPCKeepAliveInterval, "grpc-keepalive-interval", time.Duration(0), "Frequency duration of server-to-client ping to check if a connection is alive.") + fs.DurationVar(&cfg.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", time.Duration(0), "Additional duration of wait before closing a non-responsive connection.") // clustering fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.") diff --git a/etcdmain/help.go b/etcdmain/help.go index ad4d30240ee..e1b6956a271 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -54,6 +54,12 @@ member flags: time (in milliseconds) of a heartbeat interval. --election-timeout '1000' time (in milliseconds) for an election to timeout. See tuning documentation for details. + --grpc-keepalive-min-time '5s' + minimum duration interval that a client should wait before pinging server. + --grpc-keepalive-interval '0s' + frequency duration of server-to-client ping to check if a connection is alive. + --grpc-keepalive-timeout '0s' + additional duration of wait before closing a non-responsive connection. --listen-peer-urls 'http://localhost:2380' list of URLs to listen on for peer traffic. --listen-client-urls 'http://localhost:2379' diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 19943ff52d5..7225b46eea0 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -37,7 +37,7 @@ func init() { grpclog.SetLogger(plog) } -func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server { +func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server { var opts []grpc.ServerOption opts = append(opts, grpc.CustomCodec(&codec{})) if tls != nil { @@ -47,7 +47,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server { opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s))) opts = append(opts, grpc.MaxMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes))) opts = append(opts, grpc.MaxConcurrentStreams(maxStreams)) - grpcServer := grpc.NewServer(opts...) + grpcServer := grpc.NewServer(append(opts, gopts...)...) pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s)) pb.RegisterWatchServer(grpcServer, NewWatchServer(s)) diff --git a/integration/bridge.go b/integration/bridge.go index b9e67318e52..54ca11ffe1e 100644 --- a/integration/bridge.go +++ b/integration/bridge.go @@ -17,6 +17,7 @@ package integration import ( "fmt" "io" + "io/ioutil" "net" "sync" @@ -31,9 +32,10 @@ type bridge struct { l net.Listener conns map[*bridgeConn]struct{} - stopc chan struct{} - pausec chan struct{} - wg sync.WaitGroup + stopc chan struct{} + pausec chan struct{} + blackholec chan struct{} + wg sync.WaitGroup mu sync.Mutex } @@ -41,11 +43,12 @@ type bridge struct { func newBridge(addr string) (*bridge, error) { b := &bridge{ // bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number - inaddr: addr + "0", - outaddr: addr, - conns: make(map[*bridgeConn]struct{}), - stopc: make(chan struct{}), - pausec: make(chan struct{}), + inaddr: addr + "0", + outaddr: addr, + conns: make(map[*bridgeConn]struct{}), + stopc: make(chan struct{}), + pausec: make(chan struct{}), + blackholec: make(chan struct{}), } close(b.pausec) @@ -152,12 +155,12 @@ func (b *bridge) serveConn(bc *bridgeConn) { var wg sync.WaitGroup wg.Add(2) go func() { - io.Copy(bc.out, bc.in) + ioCopy(bc.out, bc.in, b.blackholec) bc.close() wg.Done() }() go func() { - io.Copy(bc.in, bc.out) + ioCopy(bc.in, bc.out, b.blackholec) bc.close() wg.Done() }() @@ -179,3 +182,43 @@ func (bc *bridgeConn) close() { bc.in.Close() bc.out.Close() } + +func (b *bridge) Blackhole() { + b.mu.Lock() + close(b.blackholec) + b.mu.Unlock() +} + +func (b *bridge) Unblackhole() { + b.mu.Lock() + b.blackholec = make(chan struct{}) + b.mu.Unlock() +} + +// https://github.com/golang/go/blob/master/src/io/io.go copyBuffer +func ioCopy(dst io.Writer, src io.Reader, blackholec chan struct{}) (err error) { + buf := make([]byte, 32*1024) + for { + select { + case <-blackholec: + io.Copy(ioutil.Discard, src) + return nil + default: + } + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if ew != nil { + return ew + } + if nr != nw { + return io.ErrShortWrite + } + } + if er != nil { + err = er + break + } + } + return +} diff --git a/integration/cluster.go b/integration/cluster.go index ed245eca2f2..5263d284043 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -33,6 +33,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" "github.com/coreos/etcd/client" "github.com/coreos/etcd/clientv3" @@ -89,14 +90,17 @@ var ( ) type ClusterConfig struct { - Size int - PeerTLS *transport.TLSInfo - ClientTLS *transport.TLSInfo - DiscoveryURL string - UseGRPC bool - QuotaBackendBytes int64 - MaxTxnOps uint - MaxRequestBytes uint + Size int + PeerTLS *transport.TLSInfo + ClientTLS *transport.TLSInfo + DiscoveryURL string + UseGRPC bool + QuotaBackendBytes int64 + MaxTxnOps uint + MaxRequestBytes uint + GRPCKeepAliveMinTime time.Duration + GRPCKeepAliveInterval time.Duration + GRPCKeepAliveTimeout time.Duration } type cluster struct { @@ -224,12 +228,15 @@ func (c *cluster) HTTPMembers() []client.Member { func (c *cluster) mustNewMember(t *testing.T) *member { m := mustNewMember(t, memberConfig{ - name: c.name(rand.Int()), - peerTLS: c.cfg.PeerTLS, - clientTLS: c.cfg.ClientTLS, - quotaBackendBytes: c.cfg.QuotaBackendBytes, - maxTxnOps: c.cfg.MaxTxnOps, - maxRequestBytes: c.cfg.MaxRequestBytes, + name: c.name(rand.Int()), + peerTLS: c.cfg.PeerTLS, + clientTLS: c.cfg.ClientTLS, + quotaBackendBytes: c.cfg.QuotaBackendBytes, + maxTxnOps: c.cfg.MaxTxnOps, + maxRequestBytes: c.cfg.MaxRequestBytes, + grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, + grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, + grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, }) m.DiscoveryURL = c.cfg.DiscoveryURL if c.cfg.UseGRPC { @@ -479,9 +486,10 @@ type member struct { s *etcdserver.EtcdServer hss []*httptest.Server - grpcServer *grpc.Server - grpcAddr string - grpcBridge *bridge + grpcServerOpts []grpc.ServerOption + grpcServer *grpc.Server + grpcAddr string + grpcBridge *bridge // serverClient is a clientv3 that directly calls the etcdserver. serverClient *clientv3.Client @@ -492,12 +500,15 @@ type member struct { func (m *member) GRPCAddr() string { return m.grpcAddr } type memberConfig struct { - name string - peerTLS *transport.TLSInfo - clientTLS *transport.TLSInfo - quotaBackendBytes int64 - maxTxnOps uint - maxRequestBytes uint + name string + peerTLS *transport.TLSInfo + clientTLS *transport.TLSInfo + quotaBackendBytes int64 + maxTxnOps uint + maxRequestBytes uint + grpcKeepAliveMinTime time.Duration + grpcKeepAliveInterval time.Duration + grpcKeepAliveTimeout time.Duration } // mustNewMember return an inited member with the given name. If peerTLS is @@ -554,6 +565,21 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member { m.MaxRequestBytes = embed.DefaultMaxRequestBytes } m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough + + m.grpcServerOpts = []grpc.ServerOption{} + if mcfg.grpcKeepAliveMinTime > time.Duration(0) { + m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: mcfg.grpcKeepAliveMinTime, + PermitWithoutStream: false, + })) + } + if mcfg.grpcKeepAliveInterval > time.Duration(0) && + mcfg.grpcKeepAliveTimeout > time.Duration(0) { + m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: mcfg.grpcKeepAliveInterval, + Timeout: mcfg.grpcKeepAliveTimeout, + })) + } return m } @@ -584,6 +610,8 @@ func (m *member) ID() types.ID { return m.s.ID() } func (m *member) DropConnections() { m.grpcBridge.Reset() } func (m *member) PauseConnections() { m.grpcBridge.Pause() } func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() } +func (m *member) Blackhole() { m.grpcBridge.Blackhole() } +func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() } // NewClientV3 creates a new grpc client connection to the member func NewClientV3(m *member) (*clientv3.Client, error) { @@ -693,7 +721,7 @@ func (m *member) Launch() error { return err } } - m.grpcServer = v3rpc.Server(m.s, tlscfg) + m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...) m.serverClient = v3client.New(m.s) lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient)) epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))