Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdmain: configure keep-alive policy from server #8258

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 58 additions & 10 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/url"
"strings"
"sync"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
Expand All @@ -32,8 +33,10 @@ var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address avai
// simpleBalancer does the bare minimum to expose multiple eps
// to the grpc reconnection code path
type simpleBalancer struct {
// addrs are the client's endpoints for grpc
addrs []grpc.Address
keepAlive bool
// addrs are the client's endpoints for grpc,
// mapped to connection activity status
addrs map[grpc.Address]addrConn
// notifyCh notifies grpc of the set of addresses for connecting
notifyCh chan []grpc.Address

Expand Down Expand Up @@ -73,9 +76,9 @@ type simpleBalancer struct {

func newSimpleBalancer(eps []string) *simpleBalancer {
notifyCh := make(chan []grpc.Address, 1)
addrs := make([]grpc.Address, len(eps))
addrs := make(map[grpc.Address]addrConn, len(eps))
for i := range eps {
addrs[i].Addr = getHost(eps[i])
addrs[grpc.Address{Addr: getHost(eps[i])}] = addrConn{active: true, last: time.Now()}
}
sb := &simpleBalancer{
addrs: addrs,
Expand Down Expand Up @@ -136,9 +139,9 @@ func (b *simpleBalancer) updateAddrs(eps []string) {

b.host2ep = np

addrs := make([]grpc.Address, 0, len(eps))
addrs := make(map[grpc.Address]addrConn, len(eps))
for i := range eps {
addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
addrs[grpc.Address{Addr: getHost(eps[i])}] = addrConn{active: true, last: time.Now()}
}
b.addrs = addrs

Expand All @@ -156,15 +159,38 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
}
}

func hasAddr(addrs []grpc.Address, targetAddr string) bool {
for _, addr := range addrs {
func hasAddr(addrs map[grpc.Address]addrConn, targetAddr string) bool {
for addr := range addrs {
if targetAddr == addr.Addr {
return true
}
}
return false
}

type addrConn struct {
active bool
last time.Time
}

func setActive(addrs map[grpc.Address]addrConn, targetAddr string, active bool) (down bool) {
for addr, v := range addrs {
if targetAddr == addr.Addr {
// TODO: configure interval
if !v.active && time.Since(v.last) < time.Minute {
return false
}
ac := addrConn{active: active, last: v.last}
if active {
ac.last = time.Now()
}
addrs[addr] = ac
return true
}
}
return false
}

func (b *simpleBalancer) updateNotifyLoop() {
defer close(b.donec)

Expand Down Expand Up @@ -221,14 +247,24 @@ func (b *simpleBalancer) updateNotifyLoop() {

func (b *simpleBalancer) notifyAddrs() {
b.mu.RLock()
addrs := b.addrs
multi := len(b.addrs) > 1 // if single, retry the only endpoint
addrs := make([]grpc.Address, 0, len(b.addrs))
for addr, ac := range b.addrs {
if b.keepAlive && multi && !ac.active {
continue
}
addrs = append(addrs, addr)
}
b.mu.RUnlock()
select {
case b.notifyCh <- addrs:
case <-b.stopc:
}
}

// Up is called by gRPC client after address connection state
// becomes connectivity.Ready. This is after HTTP/2 client
// establishes the transport.
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
Expand All @@ -244,6 +280,11 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
if !hasAddr(b.addrs, addr.Addr) {
return func(err error) {}
}
if !setActive(b.addrs, addr.Addr, true) { // mark connectivity state as active
// it is possible that Up is called before gRPC receives Notify()
// and tears down keepalive timed-out endpoints
return func(err error) {}
}
if b.pinAddr != "" {
return func(err error) {}
}
Expand All @@ -255,8 +296,15 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
b.mu.Lock()
if b.keepAlive &&
(err.Error() == "grpc: failed with network I/O error" ||
err.Error() == "grpc: the connection is drained") {
// set as connectivity.TransientFailure until next Up
// TODO: undo this when connection is up
setActive(b.addrs, addr.Addr, false)
}
b.upc = make(chan struct{})
close(b.downc)
close(b.downc) // trigger notifyAddrs
b.pinAddr = ""
b.mu.Unlock()
}
Expand Down
4 changes: 4 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ func newClient(cfg *Config) (*Client, error) {
}

client.balancer = newSimpleBalancer(cfg.Endpoints)
client.balancer.mu.Lock()
client.balancer.keepAlive = cfg.DialKeepAliveTime > 0
client.balancer.mu.Unlock()

// use Endpoints[0] so that for https:// without any tls config given, then
// grpc will assume the ServerName is in the endpoint.
conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
Expand Down
72 changes: 72 additions & 0 deletions clientv3/integration/watch_keepalive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !cluster_proxy

package integration

import (
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
"golang.org/x/net/context"
)

// TestWatchKeepAlive ensures that watch discovers it cannot talk to server
// and then switch to another endpoint with keep-alive parameters.
// TODO: test with '-tags cluster_proxy'
func TestWatchKeepAlive(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 3,
GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings
GRPCKeepAliveInterval: 5 * time.Second, // server-to-client ping
GRPCKeepAliveTimeout: time.Millisecond,
})
defer clus.Terminate(t)

ccfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()},
DialKeepAliveTime: 5 * time.Second,
DialKeepAliveTimeout: time.Nanosecond,
}
cli, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()

wch := cli.Watch(context.Background(), "foo", clientv3.WithCreatedNotify())
if _, ok := <-wch; !ok {
t.Fatalf("watch failed")
}
clus.Members[0].Blackhole()

// expect 'cli' to switch endpoints from keepalive ping
// give enough time for slow machine
time.Sleep(ccfg.DialKeepAliveTime + 3*time.Second)

if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
}
select {
case <-wch:
case <-time.After(3 * time.Second):
t.Fatal("took too long to receive events")
}
}
58 changes: 39 additions & 19 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/url"
"path/filepath"
"strings"
"time"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/cors"
Expand Down Expand Up @@ -92,6 +93,24 @@ type Config struct {
MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"`

// gRPC server options

// GRPCKeepAliveMinTime is the minimum interval that a client should
// wait before pinging server.
// When client pings "too fast", server sends goaway and closes the
// connection (errors: too_many_pings, http2.ErrCodeEnhanceYourCalm).
// When too slow, nothing happens.
// Server expects client pings only when there is any active streams
// by setting 'PermitWithoutStream' false.
GRPCKeepAliveMinTime time.Duration `json:"grpc-keepalive-min-time"`
// GRPCKeepAliveInterval is the frequency of server-to-client ping
// to check if a connection is alive. Close a non-responsive connection
// after an additional duration of Timeout.
GRPCKeepAliveInterval time.Duration `json:"grpc-keepalive-interval"`
// GRPCKeepAliveTimeout is the additional duration of wait
// before closing a non-responsive connection.
GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`

// clustering

APUrls, ACUrls []url.URL
Expand Down Expand Up @@ -175,25 +194,26 @@ func NewConfig() *Config {
lcurl, _ := url.Parse(DefaultListenClientURLs)
acurl, _ := url.Parse(DefaultAdvertiseClientURLs)
cfg := &Config{
CorsInfo: &cors.CORSInfo{},
MaxSnapFiles: DefaultMaxSnapshots,
MaxWalFiles: DefaultMaxWALs,
Name: DefaultName,
SnapCount: etcdserver.DefaultSnapCount,
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
TickMs: 100,
ElectionMs: 1000,
LPUrls: []url.URL{*lpurl},
LCUrls: []url.URL{*lcurl},
APUrls: []url.URL{*apurl},
ACUrls: []url.URL{*acurl},
ClusterState: ClusterStateFlagNew,
InitialClusterToken: "etcd-cluster",
StrictReconfigCheck: true,
Metrics: "basic",
EnableV2: true,
AuthToken: "simple",
CorsInfo: &cors.CORSInfo{},
MaxSnapFiles: DefaultMaxSnapshots,
MaxWalFiles: DefaultMaxWALs,
Name: DefaultName,
SnapCount: etcdserver.DefaultSnapCount,
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
GRPCKeepAliveMinTime: 5 * time.Second,
TickMs: 100,
ElectionMs: 1000,
LPUrls: []url.URL{*lpurl},
LCUrls: []url.URL{*lcurl},
APUrls: []url.URL{*apurl},
ACUrls: []url.URL{*acurl},
ClusterState: ClusterStateFlagNew,
InitialClusterToken: "etcd-cluster",
StrictReconfigCheck: true,
Metrics: "basic",
EnableV2: true,
AuthToken: "simple",
}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
return cfg
Expand Down
20 changes: 19 additions & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
"github.com/coreos/etcd/etcdserver/api/v2http"
Expand Down Expand Up @@ -397,9 +400,24 @@ func (e *Etcd) serve() (err error) {
}
h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo})

gopts := []grpc.ServerOption{}
if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: e.cfg.GRPCKeepAliveMinTime,
PermitWithoutStream: false,
}))
}
if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
Time: e.cfg.GRPCKeepAliveInterval,
Timeout: e.cfg.GRPCKeepAliveTimeout,
}))
}

for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler))
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
}(sctx)
}

Expand Down
11 changes: 8 additions & 3 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ func newServeCtx() *serveCtx {
// serve accepts incoming connections on the listener l,
// creating a new service goroutine for each. The service goroutines
// read requests and then call handler to reply to them.
func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo, handler http.Handler, errHandler func(error)) error {
func (sctx *serveCtx) serve(
s *etcdserver.EtcdServer,
tlsinfo *transport.TLSInfo,
handler http.Handler,
errHandler func(error),
gopts ...grpc.ServerOption) error {
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
<-s.ReadyNotify()
plog.Info("ready to serve client requests")
Expand All @@ -77,7 +82,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo
servLock := v3lock.NewLockServer(v3c)

if sctx.insecure {
gs := v3rpc.Server(s, nil)
gs := v3rpc.Server(s, nil, gopts...)
sctx.grpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
Expand Down Expand Up @@ -111,7 +116,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo
if tlsErr != nil {
return tlsErr
}
gs := v3rpc.Server(s, tlscfg)
gs := v3rpc.Server(s, tlscfg, gopts...)
sctx.grpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
Expand Down
4 changes: 4 additions & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"runtime"
"strings"
"time"

"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/pkg/flags"
Expand Down Expand Up @@ -143,6 +144,9 @@ func newConfig() *config {
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum number of operations permitted in a transaction.")
fs.UintVar(&cfg.MaxRequestBytes, "max-request-bytes", cfg.MaxRequestBytes, "Maximum client request size in bytes the server will accept.")
fs.DurationVar(&cfg.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.Config.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.")
fs.DurationVar(&cfg.GRPCKeepAliveInterval, "grpc-keepalive-interval", time.Duration(0), "Frequency duration of server-to-client ping to check if a connection is alive.")
fs.DurationVar(&cfg.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", time.Duration(0), "Additional duration of wait before closing a non-responsive connection.")

// clustering
fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
Expand Down
Loading