diff --git a/client/client.go b/client/client.go index b30f9534caf..22b037d1ede 100644 --- a/client/client.go +++ b/client/client.go @@ -240,19 +240,38 @@ func WithMaxErrorRetry(count int) ClientOption { var _ Client = (*client)(nil) -// serviceModeKeeper is for service mode switching +// serviceModeKeeper is for service mode switching. type serviceModeKeeper struct { - svcModeMutex sync.RWMutex + // RMutex here is for the future usage that there might be multiple goroutines + // triggering service mode switching concurrently. + sync.RWMutex serviceMode pdpb.ServiceMode - tsoClient atomic.Value + tsoClient atomic.Value // *tsoClient tsoSvcDiscovery ServiceDiscovery } +func (smk *serviceModeKeeper) close() { + smk.Lock() + defer smk.Unlock() + switch smk.serviceMode { + case pdpb.ServiceMode_API_SVC_MODE: + smk.tsoSvcDiscovery.Close() + fallthrough + case pdpb.ServiceMode_PD_SVC_MODE: + if tsoCli := smk.tsoClient.Load(); tsoCli != nil { + tsoCli.(*tsoClient).Close() + } + case pdpb.ServiceMode_UNKNOWN_SVC_MODE: + } +} + type client struct { keyspaceID uint32 svrUrls []string pdSvcDiscovery ServiceDiscovery tokenDispatcher *tokenDispatcher + + // For service mode switching. serviceModeKeeper // For internal usage. @@ -348,12 +367,7 @@ func (c *client) Close() { c.cancel() c.wg.Wait() - if tsoClient := c.getTSOClient(); tsoClient != nil { - tsoClient.Close() - } - if c.tsoSvcDiscovery != nil { - c.tsoSvcDiscovery.Close() - } + c.serviceModeKeeper.close() c.pdSvcDiscovery.Close() if c.tokenDispatcher != nil { @@ -364,57 +378,56 @@ func (c *client) Close() { } func (c *client) setServiceMode(newMode pdpb.ServiceMode) { - c.svcModeMutex.Lock() - defer c.svcModeMutex.Unlock() - + c.Lock() + defer c.Unlock() if newMode == c.serviceMode { return } - - log.Info("changing service mode", zap.String("old-mode", pdpb.ServiceMode_name[int32(c.serviceMode)]), - zap.String("new-mode", pdpb.ServiceMode_name[int32(newMode)])) - - if newMode == pdpb.ServiceMode_UNKNOWN_SVC_MODE { - log.Warn("intend to switch to unknown service mode. do nothing") - return - } - - var newTSOCli *tsoClient - tsoSvcDiscovery := c.tsoSvcDiscovery - ctx, cancel := context.WithCancel(c.ctx) - - if newMode == pdpb.ServiceMode_PD_SVC_MODE { - newTSOCli = newTSOClient(ctx, cancel, c.option, c.keyspaceID, - c.pdSvcDiscovery, c.pdSvcDiscovery.(tsoAllocatorEventSource), &pdTSOStreamBuilderFactory{}) - newTSOCli.Setup() - } else { - tsoSvcDiscovery = newTSOServiceDiscovery(ctx, cancel, MetaStorageClient(c), + log.Info("[pd] changing service mode", + zap.String("old-mode", c.serviceMode.String()), + zap.String("new-mode", newMode.String())) + // Re-create a new TSO client. + var ( + newTSOCli *tsoClient + newTSOSvcDiscovery ServiceDiscovery + ) + switch newMode { + case pdpb.ServiceMode_PD_SVC_MODE: + newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID, + c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{}) + case pdpb.ServiceMode_API_SVC_MODE: + newTSOSvcDiscovery = newTSOServiceDiscovery(c.ctx, MetaStorageClient(c), c.GetClusterID(c.ctx), c.keyspaceID, c.svrUrls, c.tlsCfg, c.option) - newTSOCli = newTSOClient(ctx, cancel, c.option, c.keyspaceID, - tsoSvcDiscovery, tsoSvcDiscovery.(tsoAllocatorEventSource), &tsoTSOStreamBuilderFactory{}) - if err := tsoSvcDiscovery.Init(); err != nil { - cancel() - log.Error("failed to initialize tso service discovery. keep the current service mode", - zap.Strings("svr-urls", c.svrUrls), zap.String("current-mode", pdpb.ServiceMode_name[int32(c.serviceMode)]), zap.Error(err)) + newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID, + newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{}) + if err := newTSOSvcDiscovery.Init(); err != nil { + log.Error("[pd] failed to initialize tso service discovery. keep the current service mode", + zap.Strings("svr-urls", c.svrUrls), zap.String("current-mode", c.serviceMode.String()), zap.Error(err)) return } - newTSOCli.Setup() - } - - // cleanup the old tso client - if oldTSOCli := c.getTSOClient(); oldTSOCli != nil { - oldTSOCli.Close() - } - if c.serviceMode == pdpb.ServiceMode_API_SVC_MODE { - c.tsoSvcDiscovery.Close() + case pdpb.ServiceMode_UNKNOWN_SVC_MODE: + log.Warn("[pd] intend to switch to unknown service mode, just return") + return } - - c.tsoSvcDiscovery = tsoSvcDiscovery + newTSOCli.Setup() + // Replace the old TSO client. + oldTSOClient := c.getTSOClient() c.tsoClient.Store(newTSOCli) - - log.Info("service mode changed", zap.String("old-mode", pdpb.ServiceMode_name[int32(c.serviceMode)]), - zap.String("new-mode", pdpb.ServiceMode_name[int32(newMode)])) + oldTSOClient.Close() + // Replace the old TSO service discovery if needed. + oldTSOSvcDiscovery := c.tsoSvcDiscovery + if newTSOSvcDiscovery != nil { + c.tsoSvcDiscovery = newTSOSvcDiscovery + // Close the old TSO service discovery safely after both the old client + // and service discovery are replaced. + if oldTSOSvcDiscovery != nil { + oldTSOSvcDiscovery.Close() + } + } c.serviceMode = newMode + log.Info("[pd] service mode changed", + zap.String("old-mode", c.serviceMode.String()), + zap.String("new-mode", newMode.String())) } func (c *client) getTSOClient() *tsoClient { diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 98f51f0a47b..73074943304 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -114,13 +114,13 @@ type pdServiceDiscovery struct { // serviceModeUpdateCb will be called when the service mode gets updated serviceModeUpdateCb func(pdpb.ServiceMode) - // leaderSwitchedCbs will be called after the leader swichted + // leaderSwitchedCbs will be called after the leader switched leaderSwitchedCbs []func() // membersChangedCbs will be called after there is any membership change in the // leader and followers membersChangedCbs []func() // tsoLocalAllocLeadersUpdatedCb will be called when the local tso allocator - // leader list is updated. The input is a map {DC Localtion -> Leader Addr} + // leader list is updated. The input is a map {DC Location -> Leader Addr} tsoLocalAllocLeadersUpdatedCb tsoLocalServAddrsUpdatedFunc // tsoGlobalAllocLeaderUpdatedCb will be called when the global tso allocator // leader is updated. @@ -138,9 +138,13 @@ type pdServiceDiscovery struct { option *option } -// newPDServiceDiscovery returns a new baseClient. -func newPDServiceDiscovery(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, - serviceModeUpdateCb func(pdpb.ServiceMode), urls []string, tlsCfg *tlsutil.TLSConfig, option *option) *pdServiceDiscovery { +// newPDServiceDiscovery returns a new PD service discovery-based client. +func newPDServiceDiscovery( + ctx context.Context, cancel context.CancelFunc, + wg *sync.WaitGroup, + serviceModeUpdateCb func(pdpb.ServiceMode), + urls []string, tlsCfg *tlsutil.TLSConfig, option *option, +) *pdServiceDiscovery { pdsd := &pdServiceDiscovery{ checkMembershipCh: make(chan struct{}, 1), ctx: ctx, @@ -155,26 +159,27 @@ func newPDServiceDiscovery(ctx context.Context, cancel context.CancelFunc, wg *s } func (c *pdServiceDiscovery) Init() error { - if !c.isInitialized { - if err := c.initRetry(c.initClusterID); err != nil { - c.cancel() - return err - } - if err := c.initRetry(c.updateMember); err != nil { - c.cancel() - return err - } - log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID)) + if c.isInitialized { + return nil + } - c.updateServiceMode() + if err := c.initRetry(c.initClusterID); err != nil { + c.cancel() + return err + } + if err := c.initRetry(c.updateMember); err != nil { + c.cancel() + return err + } + log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID)) - c.wg.Add(2) - go c.updateMemberLoop() - go c.updateServiceModeLoop() + c.updateServiceMode() - c.isInitialized = true - } + c.wg.Add(2) + go c.updateMemberLoop() + go c.updateServiceModeLoop() + c.isInitialized = true return nil } @@ -198,13 +203,15 @@ func (c *pdServiceDiscovery) updateMemberLoop() { ctx, cancel := context.WithCancel(c.ctx) defer cancel() + ticker := time.NewTicker(memberUpdateInterval) + defer ticker.Stop() for { select { - case <-c.checkMembershipCh: - case <-time.After(memberUpdateInterval): case <-ctx.Done(): return + case <-ticker.C: + case <-c.checkMembershipCh: } failpoint.Inject("skipUpdateMember", func() { failpoint.Continue() @@ -220,25 +227,26 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() { ctx, cancel := context.WithCancel(c.ctx) defer cancel() + ticker := time.NewTicker(serviceModeUpdateInterval) + defer ticker.Stop() for { select { - case <-time.After(serviceModeUpdateInterval): case <-ctx.Done(): return + case <-ticker.C: } - c.updateServiceMode() } } -// Close releases all resources +// Close releases all resources. func (c *pdServiceDiscovery) Close() { c.closeOnce.Do(func() { - log.Info("close pd service discovery") + log.Info("[pd] close pd service discovery client") c.clientConns.Range(func(key, cc interface{}) bool { if err := cc.(*grpc.ClientConn).Close(); err != nil { - log.Error("[pd] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) + log.Error("[pd] failed to close grpc clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) } c.clientConns.Delete(key) return true diff --git a/client/tso_client.go b/client/tso_client.go index f1542ce4ed5..c5427af9dc3 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -91,8 +91,11 @@ type tsoClient struct { } // newTSOClient returns a new TSO client. -func newTSOClient(ctx context.Context, cancel context.CancelFunc, option *option, keyspaceID uint32, - svcDiscovery ServiceDiscovery, eventSrc tsoAllocatorEventSource, factory tsoStreamBuilderFactory) *tsoClient { +func newTSOClient( + ctx context.Context, option *option, keyspaceID uint32, + svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory, +) *tsoClient { + ctx, cancel := context.WithCancel(ctx) c := &tsoClient{ ctx: ctx, cancel: cancel, @@ -105,6 +108,7 @@ func newTSOClient(ctx context.Context, cancel context.CancelFunc, option *option updateTSOConnectionCtxsCh: make(chan struct{}, 1), } + eventSrc := svcDiscovery.(tsoAllocatorEventSource) eventSrc.SetTSOLocalServAddrsUpdatedCallback(c.updateTSOLocalServAddrs) eventSrc.SetTSOGlobalServAddrUpdatedCallback(c.updateTSOGlobalServAddr) c.svcDiscovery.AddServiceAddrsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs) @@ -124,6 +128,9 @@ func (c *tsoClient) Setup() { // Close closes the TSO client func (c *tsoClient) Close() { + if c == nil { + return + } log.Info("closing tso client") c.cancel() diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 13bbda1a707..7af4c859a3e 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -211,7 +211,7 @@ func (c *tsoClient) tsoDispatcherCheckLoop() { case <-ticker.C: case <-c.checkTSODispatcherCh: case <-loopCtx.Done(): - log.Info("exit tso dispacther loop") + log.Info("exit tso dispatcher loop") return } } diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 96602139a98..759e5306906 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -79,8 +79,11 @@ type tsoServiceDiscovery struct { } // newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service. -func newTSOServiceDiscovery(ctx context.Context, cancel context.CancelFunc, metacli MetaStorageClient, - clusterID uint64, keyspaceID uint32, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) ServiceDiscovery { +func newTSOServiceDiscovery( + ctx context.Context, metacli MetaStorageClient, + clusterID uint64, keyspaceID uint32, urls []string, tlsCfg *tlsutil.TLSConfig, option *option, +) ServiceDiscovery { + ctx, cancel := context.WithCancel(ctx) c := &tsoServiceDiscovery{ ctx: ctx, cancel: cancel,