Skip to content

Commit

Permalink
embed: wait up to election-timeout for pending RPCs when closing
Browse files Browse the repository at this point in the history
Both grpc.Server.Stop and grpc.Server.GracefulStop close the listeners
first, to stop accepting the new connections. GracefulStop blocks until
all clients close their open transports(connections). Unary RPCs
only take a few seconds to finish. Stream RPCs, like watch, might never
close the connections from client side, thus making gRPC server wait
forever.

This patch still calls GracefulStop, but waits up to 10s before manually
closing the open transports.

Address #8224.

Signed-off-by: Gyu-Ho Lee <[email protected]>
  • Loading branch information
gyuho committed Jul 14, 2017
1 parent 511f4d5 commit 905865b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
22 changes: 19 additions & 3 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,27 @@ func (e *Etcd) Config() Config {
func (e *Etcd) Close() {
e.closeOnce.Do(func() { close(e.stopc) })

// (gRPC server) stops accepting new connections,
// RPCs, and blocks until all pending RPCs are finished
// 2s for RPC finish time
timeout := 2*time.Second + e.Server.Cfg.ElectionTimeout()
for _, sctx := range e.sctxs {
for gs := range sctx.grpcServerC {
gs.GracefulStop()
ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
}()
// wait until all pending RPCs are finished
select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (c *ServerConfig) ReqTimeout() time.Duration {
return 5*time.Second + 2*time.Duration(c.ElectionTicks)*time.Duration(c.TickMs)*time.Millisecond
}

func (c *ServerConfig) electionTimeout() time.Duration {
func (c *ServerConfig) ElectionTimeout() time.Duration {
return time.Duration(c.ElectionTicks) * time.Duration(c.TickMs) * time.Millisecond
}

Expand Down
2 changes: 1 addition & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,7 +1313,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
// promote lessor when the local member is leader and finished
// applying all entries from the last term.
if s.isLeader() {
s.lessor.Promote(s.Cfg.electionTimeout())
s.lessor.Promote(s.Cfg.ElectionTimeout())
}
return
}
Expand Down

0 comments on commit 905865b

Please sign in to comment.