From e050e0ec2172c95d784aeb76f89c679265176b70 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 24 Apr 2023 13:39:32 +0800 Subject: [PATCH] mcs: pick some tso forward fix (#6341)(#6279) (#77) * mcs: update client when meet transport is closing (#6341) * mcs: update client when meet transport is closing Signed-off-by: lhy1024 * address comments Signed-off-by: lhy1024 * add retry Signed-off-by: lhy1024 --------- Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Signed-off-by: lhy1024 * mcs: fix watch primary address revision and update cache when meets not leader (#6279) ref tikv/pd#5895 Signed-off-by: lhy1024 Co-authored-by: Ti Chi Robot Signed-off-by: lhy1024 --------- Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Co-authored-by: Ti Chi Robot --- server/grpc_service.go | 49 ++++++++++++++++++++-------- server/server.go | 72 ++++++++++++++++++++---------------------- 2 files changed, 70 insertions(+), 51 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index db0a840b343..59b60775f5b 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -52,8 +52,9 @@ const ( heartbeatSendTimeout = 5 * time.Second // tso - maxMergeTSORequests = 10000 - defaultTSOProxyTimeout = 3 * time.Second + maxMergeTSORequests = 10000 + defaultTSOProxyTimeout = 3 * time.Second + maxRetryTimesGetGlobalTSOFromTSOServer = 3 ) // gRPC errors @@ -339,7 +340,11 @@ errHandling: if err != nil { log.Error("proxy forward tso error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCSend, err)) if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) { - s.updateServicePrimaryAddrCh <- struct{}{} + select { + case s.updateServicePrimaryAddrCh <- struct{}{}: + log.Info("update service primary address") + default: + } } select { case <-dispatcherCtx.Done(): @@ -1993,24 +1998,40 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest if !ok || forwardedHost == "" { return pdpb.Timestamp{}, ErrNotFoundTSOAddr } - forwardStream, err := s.getTSOForwardStream(forwardedHost) - if err != nil { - return pdpb.Timestamp{}, err - } - forwardStream.Send(&tsopb.TsoRequest{ + request := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: s.clusterID, KeyspaceId: utils.DefaultKeyspaceID, KeyspaceGroupId: utils.DefaultKeySpaceGroupID, }, Count: 1, - }) - ts, err := forwardStream.Recv() - if err != nil { - log.Error("get global tso from tso server failed", zap.Error(err)) - return pdpb.Timestamp{}, err } - return *ts.GetTimestamp(), nil + var ( + forwardStream tsopb.TSO_TsoClient + ts *tsopb.TsoResponse + err error + ) + for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ { + forwardStream, err = s.getTSOForwardStream(forwardedHost) + if err != nil { + return pdpb.Timestamp{}, err + } + forwardStream.Send(request) + ts, err = forwardStream.Recv() + if err != nil { + if strings.Contains(err.Error(), codes.Unavailable.String()) { + s.tsoClientPool.Lock() + delete(s.tsoClientPool.clients, forwardedHost) + s.tsoClientPool.Unlock() + continue + } + log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return pdpb.Timestamp{}, err + } + return *ts.GetTimestamp(), nil + } + log.Error("get global tso from tso service primary addr failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return pdpb.Timestamp{}, err } func (s *GrpcServer) getTSOForwardStream(forwardedHost string) (tsopb.TSO_TsoClient, error) { diff --git a/server/server.go b/server/server.go index 18889a09742..65a73009420 100644 --- a/server/server.go +++ b/server/server.go @@ -106,7 +106,7 @@ const ( // retryIntervalGetServicePrimary is the retry interval for getting primary addr. retryIntervalGetServicePrimary = 100 * time.Millisecond // TODO: move it to etcdutil - watchKEtcdChangeRetryInterval = 1 * time.Second + watchEtcdChangeRetryInterval = 1 * time.Second ) // EtcdStartTimeout the timeout of the startup etcd. @@ -207,9 +207,11 @@ type Server struct { auditBackends []audit.Backend - registry *registry.ServiceRegistry - mode string - servicePrimaryMap sync.Map /* Store as map[string]string */ + registry *registry.ServiceRegistry + mode string + servicePrimaryMap sync.Map /* Store as map[string]string */ + // updateServicePrimaryAddrCh is used to notify the server to update the service primary address. + // Note: it is only used in API service mode. updateServicePrimaryAddrCh chan struct{} } @@ -244,7 +246,6 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le }{ clients: make(map[string]tsopb.TSO_TsoClient), }, - updateServicePrimaryAddrCh: make(chan struct{}, 1), } s.handler = newHandler(s) @@ -1757,25 +1758,25 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { defer s.serverLoopWg.Done() ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() - + s.updateServicePrimaryAddrCh = make(chan struct{}, 1) serviceKey := s.servicePrimaryKey(serviceName) var ( revision int64 err error ) for i := 0; i < maxRetryTimesGetServicePrimary; i++ { + revision, err = s.updateServicePrimaryAddr(serviceName) + if revision != 0 && err == nil { // update success + break + } select { case <-ctx.Done(): return case <-time.After(retryIntervalGetServicePrimary): } - revision, err = s.updateServicePrimaryAddr(serviceName) - if err == nil { - break - } } - if revision == 0 { - log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey)) + if err != nil { + log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey), zap.Error(err)) } log.Info("start to watch service primary addr", zap.String("service-key", serviceKey)) for { @@ -1789,35 +1790,14 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { if err != nil { log.Error("watcher canceled unexpectedly and a new watcher will start after a while", zap.Int64("next-revision", nextRevision), - zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)), + zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)), zap.Error(err)) revision = nextRevision - time.Sleep(watchKEtcdChangeRetryInterval) + time.Sleep(watchEtcdChangeRetryInterval) } } } -// SetServicePrimaryAddr sets the primary address directly. -// Note: This function is only used for test. -func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { - s.servicePrimaryMap.Store(serviceName, addr) -} - -// updateServicePrimaryAddr updates the primary address from etcd with get operation. -func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int64, err error) { - serviceKey := s.servicePrimaryKey(serviceName) - primary := &tsopb.Participant{} - ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary) - listenUrls := primary.GetListenUrls() - if !ok || err != nil || len(listenUrls) == 0 { - return 0, err - } - // listenUrls[0] is the primary service endpoint of the keyspace group - s.servicePrimaryMap.Store(serviceName, listenUrls[0]) - log.Info("update service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0])) - return revision, nil -} - // watchServicePrimaryAddr watches the primary address on etcd. func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string, revision int64) (nextRevision int64, err error) { serviceKey := s.servicePrimaryKey(serviceName) @@ -1826,12 +1806,15 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string for { WatchChan: - watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithPrefix(), clientv3.WithRev(revision)) + watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithRev(revision)) select { case <-ctx.Done(): return revision, nil case <-s.updateServicePrimaryAddrCh: revision, err = s.updateServicePrimaryAddr(serviceName) + if err != nil { + log.Warn("update service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err)) + } goto WatchChan case wresp := <-watchChan: if wresp.CompactRevision != 0 { @@ -1867,11 +1850,26 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string s.servicePrimaryMap.Delete(serviceName) } } - revision = wresp.Header.Revision + revision = wresp.Header.Revision + 1 } } } +// updateServicePrimaryAddr updates the primary address from etcd with get operation. +func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int64, err error) { + serviceKey := s.servicePrimaryKey(serviceName) + primary := &tsopb.Participant{} + ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary) + listenUrls := primary.GetListenUrls() + if !ok || err != nil || len(listenUrls) == 0 { + return 0, err + } + // listenUrls[0] is the primary service endpoint of the keyspace group + s.servicePrimaryMap.Store(serviceName, listenUrls[0]) + log.Info("update service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0])) + return revision, nil +} + func (s *Server) servicePrimaryKey(serviceName string) string { return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary") }