Skip to content

Commit

Permalink
Bin/pick primary path change and tso server close stuck fix (tikv#101)
Browse files Browse the repository at this point in the history
* Fix tso server close stuck issue (tikv#6529)

ref tikv#5895, close tikv#6304

Rewrite TSO gPRC/HTTP server Close().

Signed-off-by: Bin Shi <[email protected]>

* mcs, tso: change keyspace group primary path. (tikv#6526)

ref tikv#5895

mcs, tso: change keyspace group primary path.

The path for non-default keyspace group primary election changes
from  "/ms/{cluster_id}/tso/{group}/primary" to "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
Default keyspace group keeps /ms/{cluster_id}/tso/00000/primary.

Signed-off-by: Bin Shi <[email protected]>

---------

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored May 29, 2023
1 parent e7be1de commit 9e1e2de
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 129 deletions.
150 changes: 102 additions & 48 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ const (
// tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes.
// format: "/ms/{cluster_id}/tso".
tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName

// maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID.
maxRetryTimesWaitAPIService = 360
// retryIntervalWaitAPIService is the interval to retry.
Expand Down Expand Up @@ -107,7 +106,11 @@ type Server struct {
// http client
httpClient *http.Client

secure bool
muxListener net.Listener
httpListener net.Listener
grpcServer *grpc.Server
httpServer *http.Server
service *Service
keyspaceGroupManager *tso.KeyspaceGroupManager
// Store as map[string]*grpc.ClientConn
Expand Down Expand Up @@ -188,6 +191,8 @@ func (s *Server) Close() {
// close tso service loops in the keyspace group manager
s.keyspaceGroupManager.Close()
s.serviceRegister.Deregister()
s.stopHTTPServer()
s.stopGRPCServer()
s.muxListener.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()
Expand Down Expand Up @@ -390,83 +395,128 @@ func (s *Server) startGRPCServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

gs := grpc.NewServer()
s.service.RegisterGRPCService(gs)
diagnosticspb.RegisterDiagnosticsServer(gs, s)
serverr := gs.Serve(l)
log.Info("grpc server stopped serving")

// Attempt graceful stop (waits for pending RPCs), but force a stop if
// it doesn't happen in a reasonable amount of time.
done := make(chan struct{})
go func() {
defer logutil.LogPanic()
log.Info("try to gracefully stop the server now")
gs.GracefulStop()
close(done)
}()
select {
case <-done:
case <-time.After(mcsutils.DefaultGRPCGracefulStopTimeout):
log.Info("stopping grpc gracefully is taking longer than expected and force stopping now")
gs.Stop()
}

log.Info("grpc server starts serving", zap.String("address", l.Addr().String()))
err := s.grpcServer.Serve(l)
if s.IsClosed() {
log.Info("grpc server stopped")
} else {
log.Fatal("grpc server stopped unexpectedly", errs.ZapError(serverr))
log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err))
}
}

func (s *Server) startHTTPServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

handler, _ := SetUpRestHandler(s.service)
hs := &http.Server{
Handler: handler,
ReadTimeout: 5 * time.Minute,
ReadHeaderTimeout: 5 * time.Second,
}
serverr := hs.Serve(l)
log.Info("http server stopped serving")

ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultHTTPGracefulShutdownTimeout)
defer cancel()
if err := hs.Shutdown(ctx); err != nil {
log.Error("http server shutdown encountered problem", errs.ZapError(err))
} else {
log.Info("all http(s) requests finished")
}
log.Info("http server starts serving", zap.String("address", l.Addr().String()))
err := s.httpServer.Serve(l)
if s.IsClosed() {
log.Info("http server stopped")
} else {
log.Fatal("http server stopped unexpectedly", errs.ZapError(serverr))
log.Fatal("http server stopped unexpectedly", errs.ZapError(err))
}
}

func (s *Server) startGRPCAndHTTPServers(l net.Listener) {
func (s *Server) startGRPCAndHTTPServers(serverReadyChan chan<- struct{}, l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

mux := cmux.New(l)
// Don't hang on matcher after closing listener
mux.SetReadTimeout(3 * time.Second)
grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
httpL := mux.Match(cmux.Any())
if s.secure {
s.httpListener = mux.Match(cmux.Any())
} else {
s.httpListener = mux.Match(cmux.HTTP1())
}

s.serverLoopWg.Add(2)
s.grpcServer = grpc.NewServer()
s.service.RegisterGRPCService(s.grpcServer)
diagnosticspb.RegisterDiagnosticsServer(s.grpcServer, s)
s.serverLoopWg.Add(1)
go s.startGRPCServer(grpcL)
go s.startHTTPServer(httpL)

handler, _ := SetUpRestHandler(s.service)
s.httpServer = &http.Server{
Handler: handler,
ReadTimeout: 3 * time.Second,
}
s.serverLoopWg.Add(1)
go s.startHTTPServer(s.httpListener)

serverReadyChan <- struct{}{}
if err := mux.Serve(); err != nil {
if s.IsClosed() {
log.Info("mux stop serving", errs.ZapError(err))
log.Info("mux stopped serving", errs.ZapError(err))
} else {
log.Panic("mux stop serving unexpectedly", errs.ZapError(err))
log.Fatal("mux stopped serving unexpectedly", errs.ZapError(err))
}
}
}

func (s *Server) stopHTTPServer() {
log.Info("stopping http server")
defer log.Info("http server stopped")

ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultHTTPGracefulShutdownTimeout)
defer cancel()

// First, try to gracefully shutdown the http server
ch := make(chan struct{})
go func() {
defer close(ch)
s.httpServer.Shutdown(ctx)
}()

select {
case <-ch:
case <-ctx.Done():
// Took too long, manually close open transports
log.Warn("http server graceful shutdown timeout, forcing close")
s.httpServer.Close()
// concurrent Graceful Shutdown should be interrupted
<-ch
}
}

func (s *Server) stopGRPCServer() {
log.Info("stopping grpc server")
defer log.Info("grpc server stopped")

// Do not grpc.Server.GracefulStop with TLS enabled etcd server
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
// and https://github.com/etcd-io/etcd/issues/8916
if s.secure {
s.grpcServer.Stop()
return
}

ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultGRPCGracefulStopTimeout)
defer cancel()

// First, try to gracefully shutdown the grpc server
ch := make(chan struct{})
go func() {
defer close(ch)
// Close listeners to stop accepting new connections,
// will block on any existing transports
s.grpcServer.GracefulStop()
}()

// Wait until all pending RPCs are finished
select {
case <-ch:
case <-ctx.Done():
// Took too long, manually close open transports
// e.g. watch streams
log.Warn("grpc server graceful shutdown timeout, forcing close")
s.grpcServer.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}

func (s *Server) startServer() (err error) {
if s.clusterID, err = mcsutils.InitClusterID(s.ctx, s.etcdClient); err != nil {
return err
Expand Down Expand Up @@ -503,6 +553,7 @@ func (s *Server) startServer() (err error) {
return err
}
if tlsConfig != nil {
s.secure = true
s.muxListener, err = tls.Listen(mcsutils.TCPNetworkStr, s.listenURL.Host, tlsConfig)
} else {
s.muxListener, err = net.Listen(mcsutils.TCPNetworkStr, s.listenURL.Host)
Expand All @@ -511,8 +562,11 @@ func (s *Server) startServer() (err error) {
return err
}

serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
s.serverLoopWg.Add(1)
go s.startGRPCAndHTTPServers(s.muxListener)
go s.startGRPCAndHTTPServers(serverReadyChan, s.muxListener)
<-serverReadyChan

// Run callbacks
log.Info("triggering the start callback functions")
Expand Down
4 changes: 4 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ const (
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)

// MicroserviceKey is the key of microservice.
MicroserviceKey = "ms"
// APIServiceName is the name of api server.
APIServiceName = "api"
// TSOServiceName is the name of tso server.
TSOServiceName = "tso"
// ResourceManagerServiceName is the name of resource manager server.
ResourceManagerServiceName = "resource_manager"
// KeyspaceGroupsKey is the path component of keyspace groups.
KeyspaceGroupsKey = "keyspace_groups"

// MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso
// is the sharding unit, i.e., by the definition here, the max count of the shards
Expand Down
23 changes: 6 additions & 17 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ const (
resourceGroupStatesPath = "states"
requestUnitConfigPath = "ru_config"
// tso storage endpoint has prefix `tso`
microserviceKey = "ms"
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"

tsoKeyspaceGroupPrefix = "tso/keyspace_groups"
tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupMembershipKey = "membership"

// we use uint64 to represent ID, the max length of uint64 is 20.
Expand Down Expand Up @@ -236,20 +235,10 @@ func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
}

// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains
// the pattern of `tso/keyspace_groups/membership/(\d{5})$`.
func ExtractKeyspaceGroupIDFromPath(path string) (uint32, error) {
// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id.
func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp {
pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/")
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(path)
if match == nil {
return 0, fmt.Errorf("invalid keyspace group id path: %s", path)
}
id, err := strconv.ParseUint(match[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err)
}
return uint32(id), nil
return regexp.MustCompile(pattern)
}

// encodeKeyspaceGroupID from uint32 to string.
Expand Down
48 changes: 0 additions & 48 deletions pkg/storage/endpoint/key_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,51 +27,3 @@ func BenchmarkRegionPath(b *testing.B) {
_ = RegionPath(uint64(i))
}
}

func TestExtractKeyspaceGroupIDFromPath(t *testing.T) {
re := require.New(t)

rightCases := []struct {
path string
id uint32
}{
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999},
{path: "tso/keyspace_groups/membership/00000", id: 0},
{path: "tso/keyspace_groups/membership/00001", id: 1},
{path: "tso/keyspace_groups/membership/12345", id: 12345},
{path: "tso/keyspace_groups/membership/99999", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}

wrongCases := []struct {
path string
}{
{path: ""},
{path: "00001"},
{path: "xxx/keyspace_groups/membership/00001"},
{path: "tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"},
{path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"},
}

for _, tt := range wrongCases {
_, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Error(err)
}
}
Loading

0 comments on commit 9e1e2de

Please sign in to comment.