Skip to content

Commit

Permalink
Add the lock back
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Mar 22, 2023
1 parent 3e98eb3 commit 6c9b730
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
53 changes: 36 additions & 17 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,39 @@ func WithMaxErrorRetry(count int) ClientOption {

var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching.
type serviceModeKeeper struct {
// RMutex here is for the future usage that there might be multiple goroutines
// triggering service mode switching concurrently.
sync.RWMutex
serviceMode pdpb.ServiceMode
tsoClient atomic.Value // *tsoClient
tsoSvcDiscovery ServiceDiscovery
}

func (smk *serviceModeKeeper) close() {
smk.Lock()
defer smk.Unlock()
switch smk.serviceMode {
case pdpb.ServiceMode_API_SVC_MODE:
smk.tsoSvcDiscovery.Close()
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
if tsoCli := smk.tsoClient.Load(); tsoCli != nil {
tsoCli.(*tsoClient).Close()
}
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
}
}

type client struct {
keyspaceID uint32
svrUrls []string
pdSvcDiscovery ServiceDiscovery
tokenDispatcher *tokenDispatcher

serviceMode pdpb.ServiceMode
tsoClient atomic.Value // *tsoClient
tsoSvcDiscovery ServiceDiscovery
// For service mode switching.
serviceModeKeeper

// For internal usage.
updateTokenConnectionCh chan struct{}
Expand Down Expand Up @@ -343,12 +367,7 @@ func (c *client) Close() {
c.cancel()
c.wg.Wait()

if tsoClient := c.getTSOClient(); tsoClient != nil {
tsoClient.Close()
}
if c.tsoSvcDiscovery != nil {
c.tsoSvcDiscovery.Close()
}
c.serviceModeKeeper.close()
c.pdSvcDiscovery.Close()

if c.tokenDispatcher != nil {
Expand All @@ -359,6 +378,8 @@ func (c *client) Close() {
}

func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
c.Lock()
defer c.Unlock()
if newMode == c.serviceMode {
return
}
Expand Down Expand Up @@ -389,22 +410,20 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
return
}
newTSOCli.Setup()

// Replace the old TSO client.
oldTSOClient := c.getTSOClient()
c.tsoClient.Store(newTSOCli)
oldTSOClient.Close()

// Replace the old TSO service discovery if needed.
oldTSOSvcDiscovery := c.tsoSvcDiscovery
// Set the new TSO service discovery if needed.
if newTSOSvcDiscovery != nil {
c.tsoSvcDiscovery = newTSOSvcDiscovery
// Close the old TSO service discovery safely after both the old client
// and service discovery are replaced.
if oldTSOSvcDiscovery != nil {
oldTSOSvcDiscovery.Close()
}
}
// Close the old TSO service discovery safely after the old client is closed.
if oldTSOSvcDiscovery != nil {
oldTSOSvcDiscovery.Close()
}

c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", c.serviceMode.String()),
Expand Down
4 changes: 2 additions & 2 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ type pdServiceDiscovery struct {

// serviceModeUpdateCb will be called when the service mode gets updated
serviceModeUpdateCb func(pdpb.ServiceMode)
// leaderSwitchedCbs will be called after the leader swichted
// leaderSwitchedCbs will be called after the leader switched
leaderSwitchedCbs []func()
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
// tsoLocalAllocLeadersUpdatedCb will be called when the local tso allocator
// leader list is updated. The input is a map {DC Localtion -> Leader Addr}
// leader list is updated. The input is a map {DC Location -> Leader Addr}
tsoLocalAllocLeadersUpdatedCb tsoLocalServAddrsUpdatedFunc
// tsoGlobalAllocLeaderUpdatedCb will be called when the global tso allocator
// leader is updated.
Expand Down
2 changes: 1 addition & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (c *tsoClient) tsoDispatcherCheckLoop() {
case <-ticker.C:
case <-c.checkTSODispatcherCh:
case <-loopCtx.Done():
log.Info("exit tso dispacther loop")
log.Info("exit tso dispatcher loop")
return
}
}
Expand Down

0 comments on commit 6c9b730

Please sign in to comment.