Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: refine serviceModeKeeper code #6201

Merged
merged 3 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 64 additions & 51 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,38 @@ func WithMaxErrorRetry(count int) ClientOption {

var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching
// serviceModeKeeper is for service mode switching.
type serviceModeKeeper struct {
svcModeMutex sync.RWMutex
// RMutex here is for the future usage that there might be multiple goroutines
// triggering service mode switching concurrently.
sync.RWMutex
serviceMode pdpb.ServiceMode
tsoClient atomic.Value
tsoClient atomic.Value // *tsoClient
tsoSvcDiscovery ServiceDiscovery
}

func (smk *serviceModeKeeper) close() {
smk.Lock()
defer smk.Unlock()
switch smk.serviceMode {
case pdpb.ServiceMode_API_SVC_MODE:
smk.tsoSvcDiscovery.Close()
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
if tsoCli := smk.tsoClient.Load(); tsoCli != nil {
tsoCli.(*tsoClient).Close()
}
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
}
}

type client struct {
keyspaceID uint32
svrUrls []string
pdSvcDiscovery ServiceDiscovery
tokenDispatcher *tokenDispatcher

// For service mode switching.
serviceModeKeeper

// For internal usage.
Expand Down Expand Up @@ -348,12 +367,7 @@ func (c *client) Close() {
c.cancel()
c.wg.Wait()

if tsoClient := c.getTSOClient(); tsoClient != nil {
tsoClient.Close()
}
if c.tsoSvcDiscovery != nil {
c.tsoSvcDiscovery.Close()
}
c.serviceModeKeeper.close()
c.pdSvcDiscovery.Close()

if c.tokenDispatcher != nil {
Expand All @@ -364,57 +378,56 @@ func (c *client) Close() {
}

func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
c.svcModeMutex.Lock()
Copy link
Contributor

@binshi-bing binshi-bing Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep this lock. Originally, I used the corresponding read lock in the getTSOClient, but eventually I used atomic.Value() to store tsoClient and removed the read lock. After that, I still decided to keep this lock and the critical section in the setServiceMode() for the following reasons:

  1. Have plan to improve error handling. After service mode switched at the server side, the requests will fail and the client, in the different goroutines, needs to call serviceModeUpdate() directly or schedule an immediate check instead of waiting serviceModeUpdateInterval 3s (I even changed the interval from the original 10s to the current value 3s for this purpose). When the leader/primary being switched on the server side, we had the similar issue -- for this case, today we fail the requests immediately without retries though we check if membership is changed immediately. All these seem to need improvement.
  2. This lock's cost can be overlooked (invoked every 3s) and it's inconvenient to add it back.
  3. I prefer to move client.setServiceMode to serviceModeKeeper.setServiceMode so that it can be more explicit for changing serviceMode, tsoClient and tsoSvcDiscovery atomically, and the other parts in the client have no write access to these values, thus it's more modular.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the lock back. PTAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. The change looks good to me.

defer c.svcModeMutex.Unlock()

c.Lock()
defer c.Unlock()
if newMode == c.serviceMode {
return
}

log.Info("changing service mode", zap.String("old-mode", pdpb.ServiceMode_name[int32(c.serviceMode)]),
zap.String("new-mode", pdpb.ServiceMode_name[int32(newMode)]))

if newMode == pdpb.ServiceMode_UNKNOWN_SVC_MODE {
log.Warn("intend to switch to unknown service mode. do nothing")
return
}

var newTSOCli *tsoClient
tsoSvcDiscovery := c.tsoSvcDiscovery
ctx, cancel := context.WithCancel(c.ctx)

if newMode == pdpb.ServiceMode_PD_SVC_MODE {
newTSOCli = newTSOClient(ctx, cancel, c.option, c.keyspaceID,
c.pdSvcDiscovery, c.pdSvcDiscovery.(tsoAllocatorEventSource), &pdTSOStreamBuilderFactory{})
newTSOCli.Setup()
} else {
tsoSvcDiscovery = newTSOServiceDiscovery(ctx, cancel, MetaStorageClient(c),
log.Info("[pd] changing service mode",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
// Re-create a new TSO client.
var (
newTSOCli *tsoClient
newTSOSvcDiscovery ServiceDiscovery
)
switch newMode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(c.ctx, MetaStorageClient(c),
c.GetClusterID(c.ctx), c.keyspaceID, c.svrUrls, c.tlsCfg, c.option)
newTSOCli = newTSOClient(ctx, cancel, c.option, c.keyspaceID,
tsoSvcDiscovery, tsoSvcDiscovery.(tsoAllocatorEventSource), &tsoTSOStreamBuilderFactory{})
if err := tsoSvcDiscovery.Init(); err != nil {
cancel()
log.Error("failed to initialize tso service discovery. keep the current service mode",
zap.Strings("svr-urls", c.svrUrls), zap.String("current-mode", pdpb.ServiceMode_name[int32(c.serviceMode)]), zap.Error(err))
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{})
if err := newTSOSvcDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize tso service discovery. keep the current service mode",
zap.Strings("svr-urls", c.svrUrls), zap.String("current-mode", c.serviceMode.String()), zap.Error(err))
return
}
newTSOCli.Setup()
}

// cleanup the old tso client
if oldTSOCli := c.getTSOClient(); oldTSOCli != nil {
oldTSOCli.Close()
}
if c.serviceMode == pdpb.ServiceMode_API_SVC_MODE {
c.tsoSvcDiscovery.Close()
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
log.Warn("[pd] intend to switch to unknown service mode, just return")
return
}

c.tsoSvcDiscovery = tsoSvcDiscovery
newTSOCli.Setup()
// Replace the old TSO client.
oldTSOClient := c.getTSOClient()
c.tsoClient.Store(newTSOCli)

log.Info("service mode changed", zap.String("old-mode", pdpb.ServiceMode_name[int32(c.serviceMode)]),
zap.String("new-mode", pdpb.ServiceMode_name[int32(newMode)]))
oldTSOClient.Close()
// Replace the old TSO service discovery if needed.
oldTSOSvcDiscovery := c.tsoSvcDiscovery
if newTSOSvcDiscovery != nil {
c.tsoSvcDiscovery = newTSOSvcDiscovery
// Close the old TSO service discovery safely after both the old client
// and service discovery are replaced.
if oldTSOSvcDiscovery != nil {
Copy link
Contributor

@binshi-bing binshi-bing Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Placeing "if oldTSOSvcDiscovery != nil {...}" here is very accurate, but reduced readability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either way is ok.

oldTSOSvcDiscovery.Close()
}
}
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
}

func (c *client) getTSOClient() *tsoClient {
Expand Down
64 changes: 36 additions & 28 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ type pdServiceDiscovery struct {

// serviceModeUpdateCb will be called when the service mode gets updated
serviceModeUpdateCb func(pdpb.ServiceMode)
// leaderSwitchedCbs will be called after the leader swichted
// leaderSwitchedCbs will be called after the leader switched
leaderSwitchedCbs []func()
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
// tsoLocalAllocLeadersUpdatedCb will be called when the local tso allocator
// leader list is updated. The input is a map {DC Localtion -> Leader Addr}
// leader list is updated. The input is a map {DC Location -> Leader Addr}
tsoLocalAllocLeadersUpdatedCb tsoLocalServAddrsUpdatedFunc
// tsoGlobalAllocLeaderUpdatedCb will be called when the global tso allocator
// leader is updated.
Expand All @@ -138,9 +138,13 @@ type pdServiceDiscovery struct {
option *option
}

// newPDServiceDiscovery returns a new baseClient.
func newPDServiceDiscovery(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup,
serviceModeUpdateCb func(pdpb.ServiceMode), urls []string, tlsCfg *tlsutil.TLSConfig, option *option) *pdServiceDiscovery {
// newPDServiceDiscovery returns a new PD service discovery-based client.
func newPDServiceDiscovery(
ctx context.Context, cancel context.CancelFunc,
wg *sync.WaitGroup,
serviceModeUpdateCb func(pdpb.ServiceMode),
urls []string, tlsCfg *tlsutil.TLSConfig, option *option,
) *pdServiceDiscovery {
pdsd := &pdServiceDiscovery{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
Expand All @@ -155,26 +159,27 @@ func newPDServiceDiscovery(ctx context.Context, cancel context.CancelFunc, wg *s
}

func (c *pdServiceDiscovery) Init() error {
if !c.isInitialized {
if err := c.initRetry(c.initClusterID); err != nil {
c.cancel()
return err
}
if err := c.initRetry(c.updateMember); err != nil {
c.cancel()
return err
}
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))
if c.isInitialized {
return nil
}

c.updateServiceMode()
if err := c.initRetry(c.initClusterID); err != nil {
c.cancel()
return err
}
if err := c.initRetry(c.updateMember); err != nil {
c.cancel()
return err
}
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

c.wg.Add(2)
go c.updateMemberLoop()
go c.updateServiceModeLoop()
c.updateServiceMode()

c.isInitialized = true
}
c.wg.Add(2)
go c.updateMemberLoop()
go c.updateServiceModeLoop()

c.isInitialized = true
return nil
}

Expand All @@ -198,13 +203,15 @@ func (c *pdServiceDiscovery) updateMemberLoop() {

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

for {
select {
case <-c.checkMembershipCh:
case <-time.After(memberUpdateInterval):
case <-ctx.Done():
return
case <-ticker.C:
case <-c.checkMembershipCh:
}
failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
Expand All @@ -220,25 +227,26 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() {

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(serviceModeUpdateInterval)
defer ticker.Stop()

for {
select {
case <-time.After(serviceModeUpdateInterval):
case <-ctx.Done():
return
case <-ticker.C:
}

c.updateServiceMode()
}
}

// Close releases all resources
// Close releases all resources.
func (c *pdServiceDiscovery) Close() {
c.closeOnce.Do(func() {
log.Info("close pd service discovery")
log.Info("[pd] close pd service discovery client")
c.clientConns.Range(func(key, cc interface{}) bool {
if err := cc.(*grpc.ClientConn).Close(); err != nil {
log.Error("[pd] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err))
log.Error("[pd] failed to close grpc clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err))
}
c.clientConns.Delete(key)
return true
Expand Down
11 changes: 9 additions & 2 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ type tsoClient struct {
}

// newTSOClient returns a new TSO client.
func newTSOClient(ctx context.Context, cancel context.CancelFunc, option *option, keyspaceID uint32,
svcDiscovery ServiceDiscovery, eventSrc tsoAllocatorEventSource, factory tsoStreamBuilderFactory) *tsoClient {
func newTSOClient(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the code style I preferred when I use the other programming languages, such as c++ and Java, but the coding style I used in this code (on the left side) is referred to other places in the code base, though there seems to be multiple coding styles. Do we have a uniform coding style or the style just needs to pass go format check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no strong principles here, I just want to make the parameters group as lines to reduce the one-line length.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I don't have strong preference. Your change here looks good to me.

ctx context.Context, option *option, keyspaceID uint32,
svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory,
) *tsoClient {
ctx, cancel := context.WithCancel(ctx)
c := &tsoClient{
ctx: ctx,
cancel: cancel,
Expand All @@ -105,6 +108,7 @@ func newTSOClient(ctx context.Context, cancel context.CancelFunc, option *option
updateTSOConnectionCtxsCh: make(chan struct{}, 1),
}

eventSrc := svcDiscovery.(tsoAllocatorEventSource)
eventSrc.SetTSOLocalServAddrsUpdatedCallback(c.updateTSOLocalServAddrs)
eventSrc.SetTSOGlobalServAddrUpdatedCallback(c.updateTSOGlobalServAddr)
c.svcDiscovery.AddServiceAddrsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs)
Expand All @@ -124,6 +128,9 @@ func (c *tsoClient) Setup() {

// Close closes the TSO client
func (c *tsoClient) Close() {
if c == nil {
return
}
log.Info("closing tso client")

c.cancel()
Expand Down
2 changes: 1 addition & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (c *tsoClient) tsoDispatcherCheckLoop() {
case <-ticker.C:
case <-c.checkTSODispatcherCh:
case <-loopCtx.Done():
log.Info("exit tso dispacther loop")
log.Info("exit tso dispatcher loop")
return
}
}
Expand Down
7 changes: 5 additions & 2 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ type tsoServiceDiscovery struct {
}

// newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service.
func newTSOServiceDiscovery(ctx context.Context, cancel context.CancelFunc, metacli MetaStorageClient,
clusterID uint64, keyspaceID uint32, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) ServiceDiscovery {
func newTSOServiceDiscovery(
ctx context.Context, metacli MetaStorageClient,
clusterID uint64, keyspaceID uint32, urls []string, tlsCfg *tlsutil.TLSConfig, option *option,
) ServiceDiscovery {
ctx, cancel := context.WithCancel(ctx)
c := &tsoServiceDiscovery{
ctx: ctx,
cancel: cancel,
Expand Down