-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: use gRPC server GracefulStop #7743
Conversation
31597ea
to
6e207e4
Compare
This should fix the inflight op crashes, right? Can there be a test? |
@heyitsanthony Yes, I will try to verify this fixes that problem by adding tests or reproduce. |
32d93e3
to
6686634
Compare
@heyitsanthony Test added. Confirmed that it fixes the issue (use |
embed/serve.go
Outdated
@@ -52,11 +52,12 @@ type serveCtx struct { | |||
|
|||
userHandlers map[string]http.Handler | |||
serviceRegister func(*grpc.Server) | |||
stopGRPCc chan func() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grpcServers []*gprc.Server
embed/serve.go
Outdated
@@ -74,6 +75,12 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle | |||
|
|||
if sctx.insecure { | |||
gs := v3rpc.Server(s, nil) | |||
sctx.stopGRPCc <- func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sctx.grpcServers = sctx.append(sctx.grpcServers, gs)
embed/serve.go
Outdated
@@ -103,6 +110,12 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle | |||
|
|||
if sctx.secure { | |||
gs := v3rpc.Server(s, tlscfg) | |||
sctx.stopGRPCc <- func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sctx.grpcServers = sctx.append(sctx.grpcServers, gs)
etcdserver/config.go
Outdated
@@ -61,6 +61,12 @@ type ServerConfig struct { | |||
ClientCertAuthEnabled bool | |||
|
|||
AuthToken string | |||
|
|||
// OnShutdown gracefully stops gRPC server on shutdown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a generic thing instead of talking about grpc, etc
// OnShutdown is called immediately before releasing etcd server resources.
integration/v3_grpc_inflight_test.go
Outdated
@@ -75,3 +76,38 @@ func TestV3MaintenanceDefragmentInflightRange(t *testing.T) { | |||
|
|||
<-donec | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there were some changes to the mvcc code so TestV3MaintenanceHashInflight
would appear to work. Namely, TestStoreHashAfterForceCommit
and the stopc
logic in Hash
should probably be removed.
integration/v3_grpc_inflight_test.go
Outdated
kvc := toGRPC(cli).KV | ||
|
||
if _, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t.Fatal(err)
embed/etcd.go
Outdated
@@ -137,6 +139,11 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { | |||
if err = e.serve(); err != nil { | |||
return | |||
} | |||
e.Server.Cfg.OnShutdown = func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OnShutdown = func() {
for _, sctx := range e.sctxs {
for _, gs := range sctx.grpcServers {
gs.GracefulStop()
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this does not sync with serve
routine? We populate sctx.grpcServers
in (e *Etcd) serve()
(which calls (sctx *serveCtx) serve
that creates *grpc.Server
.
But (e *Etcd) serve()
returns calling (sctx *serveCtx) serve
in goroutines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, we somehow need a way to sync that slice anyway.
embed/etcd.go
Outdated
@@ -343,6 +350,10 @@ func (e *Etcd) serve() (err error) { | |||
} | |||
|
|||
func (e *Etcd) errHandler(err error) { | |||
if transport.IsClosedConnError(err) || err == grpc.ErrServerStopped { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this necessary? shouldn't stopc
be closed before calling etcdserver.Stop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// in embed/etcd.go
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if err = inCfg.Validate(); err != nil {
return nil, err
}
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
cfg := &e.cfg
defer func() {
if e != nil && err != nil {
e.Close()
e = nil
}
}()
We close stopc
by calling e.Close()
here, but StartEtcd
returns with nil
error, so it is not called in our use case?
etcdserver/server.go
Outdated
// stop accepting new connections, RPCs, | ||
// and blocks until all pending RPCs are finished | ||
if s.Cfg != nil && s.Cfg.OnShutdown != nil { | ||
s.Cfg.OnShutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's possible to avoid having this in Cfg entirely-- this function could be called prior to calling HardStop/Stop; similar to how the listeners are closed in embed.Etcd.Close() before calling Server.Stop()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok let me re-organize the code.
f62b98b
to
6779d6d
Compare
I think we still need // etcdmain/etcd.go
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
if cfg.Metrics == "extensive" {
grpc_prometheus.EnableHandlingTimeHistogram()
}
e, err := embed.StartEtcd(cfg)
if err != nil {
return nil, nil, err
}
osutil.RegisterInterruptHandler(e.Server.Stop) |
why not have |
6779d6d
to
bb673bf
Compare
bb673bf
to
e2d0db9
Compare
d8ebdd3
to
5aea157
Compare
Codecov Report
@@ Coverage Diff @@
## master #7743 +/- ##
=========================================
Coverage ? 75.73%
=========================================
Files ? 331
Lines ? 26058
Branches ? 0
=========================================
Hits ? 19735
Misses ? 4899
Partials ? 1424
Continue to review full report at Codecov.
|
embed/etcd.go
Outdated
@@ -147,6 +161,7 @@ func (e *Etcd) Config() Config { | |||
|
|||
func (e *Etcd) Close() { | |||
e.closeOnce.Do(func() { close(e.stopc) }) | |||
e.OnShutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the function be inlined here instead of needing a separate OnShutdown
field?
embed/etcd.go
Outdated
// RPCs, and blocks until all pending RPCs are finished | ||
for _, sctx := range e.sctxs { | ||
for gs := range sctx.grpcServerC { | ||
plog.Warning("gracefully stopping gRPC server") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't warn / print anything? this should be part of the normal shutdown process
} | ||
// t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)', | ||
// which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size(). | ||
// Server must make sure 'batchTx.commit(false)' does not follow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably shouldn't mention the etcd server or gRPC. The contract is independent of all that-- don't have any operations inflight when closing the backend.
integration/v3_grpc_inflight_test.go
Outdated
mvc := toGRPC(cli).Maintenance | ||
mvc.Defragment(context.Background(), &pb.DefragmentRequest{}) | ||
// simulate 'embed.Etcd.Close()' with '*grpc.Server.GracefulStop' | ||
clus.Members[0].grpcServer.GracefulStop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clus.Members[0].Stop()
@@ -518,20 +518,6 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte { | |||
return bytes | |||
} | |||
|
|||
// TestStoreHashAfterForceCommit ensures that later Hash call to | |||
// closed backend with ForceCommit does not panic. | |||
func TestStoreHashAfterForceCommit(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also remove the select
in mvcc.store.Hash
, which was faking this
Signed-off-by: Gyu-Ho Lee <[email protected]>
Fix etcd-io#7322. Signed-off-by: Gyu-Ho Lee <[email protected]>
- Test etcd-io#7322. - Remove test case added in etcd-io#6662. Signed-off-by: Gyu-Ho Lee <[email protected]>
This reverts commit 994e8e4. Since now etcdserver gracefully shuts down the gRPC server
Revert etcd-io#6662. Signed-off-by: Gyu-Ho Lee <[email protected]>
Revert change in etcd-io@33acbb6. Signed-off-by: Gyu-Ho Lee <[email protected]>
5aea157
to
5000d29
Compare
@gyuho is there a workaround in early versions? |
Example output
Fix #7322.