Skip to content

Commit

Permalink
mcs: fix watch primary address revision and update cache when meets n…
Browse files Browse the repository at this point in the history
…ot leader (#6279)

ref #5895

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
lhy1024 and ti-chi-bot authored Apr 17, 2023
1 parent 8c9b4fb commit b9a03d2
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 33 deletions.
10 changes: 5 additions & 5 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
defaultLoadKeyspaceGroupsBatchSize = int64(400)
defaultLoadFromEtcdRetryInterval = 500 * time.Millisecond
defaultLoadFromEtcdMaxRetryTimes = int(defaultLoadKeyspaceGroupsTimeout / defaultLoadFromEtcdRetryInterval)
watchKEtcdChangeRetryInterval = 1 * time.Second
watchEtcdChangeRetryInterval = 1 * time.Second
)

type state struct {
Expand Down Expand Up @@ -414,7 +414,7 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups(
return revision, kgs, resp.More, nil
}

// startKeyspaceGroupsMetaWatchLoop Repeatedly watches any change in keyspace group membership/distribution
// startKeyspaceGroupsMetaWatchLoop repeatedly watches any change in keyspace group membership/distribution
// and apply the change dynamically.
func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64) {
defer logutil.LogPanic()
Expand All @@ -430,11 +430,11 @@ func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64

nextRevision, err := kgm.watchKeyspaceGroupsMetaChange(revision)
if err != nil {
log.Error("watcher canceled unexpectedly. Will start a new watcher after a while",
log.Error("watcher canceled unexpectedly and a new watcher will start after a while",
zap.Int64("next-revision", nextRevision),
zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)),
zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)),
zap.Error(err))
time.Sleep(watchKEtcdChangeRetryInterval)
time.Sleep(watchEtcdChangeRetryInterval)
}
}
}
Expand Down
30 changes: 26 additions & 4 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tsoutil

import (
"context"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -58,20 +59,32 @@ func NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize prometheus.Histo

// DispatchRequest is the entry point for dispatching/forwarding a tso request to the detination host
func (s *TSODispatcher) DispatchRequest(
ctx context.Context, req Request, tsoProtoFactory ProtoFactory, doneCh <-chan struct{}, errCh chan<- error) {
ctx context.Context,
req Request,
tsoProtoFactory ProtoFactory,
doneCh <-chan struct{},
errCh chan<- error,
updateServicePrimaryAddrChs ...chan<- struct{}) {
val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests))
reqCh := val.(chan Request)
if !loaded {
tsDeadlineCh := make(chan deadline, 1)
go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh)
go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh, updateServicePrimaryAddrChs...)
go watchTSDeadline(ctx, tsDeadlineCh)
}
reqCh <- req
}

func (s *TSODispatcher) dispatch(
ctx context.Context, tsoProtoFactory ProtoFactory, forwardedHost string, clientConn *grpc.ClientConn,
tsoRequestCh <-chan Request, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) {
ctx context.Context,
tsoProtoFactory ProtoFactory,
forwardedHost string,
clientConn *grpc.ClientConn,
tsoRequestCh <-chan Request,
tsDeadlineCh chan<- deadline,
doneCh <-chan struct{},
errCh chan<- error,
updateServicePrimaryAddrChs ...chan<- struct{}) {
defer logutil.LogPanic()
dispatcherCtx, ctxCancel := context.WithCancel(ctx)
defer ctxCancel()
Expand All @@ -98,6 +111,7 @@ func (s *TSODispatcher) dispatch(
defer cancel()

requests := make([]Request, maxMergeRequests+1)
needUpdateServicePrimaryAddr := len(updateServicePrimaryAddrChs) > 0 && updateServicePrimaryAddrChs[0] != nil
for {
select {
case first := <-tsoRequestCh:
Expand All @@ -123,6 +137,14 @@ func (s *TSODispatcher) dispatch(
log.Error("proxy forward tso error",
zap.String("forwarded-host", forwardedHost),
errs.ZapError(errs.ErrGRPCSend, err))
if needUpdateServicePrimaryAddr {
if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) {
select {
case updateServicePrimaryAddrChs[0] <- struct{}{}:
default:
}
}
}
select {
case <-dispatcherCtx.Done():
return
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
}

tsoRequest := tsoutil.NewPDProtoRequest(forwardedHost, clientConn, request, stream)
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh)
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh, s.updateServicePrimaryAddrCh)
continue
}

Expand Down
119 changes: 97 additions & 22 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ const (
maxRetryTimesGetServicePrimary = 25
// retryIntervalGetServicePrimary is the retry interval for getting primary addr.
retryIntervalGetServicePrimary = 100 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
)

// EtcdStartTimeout the timeout of the startup etcd.
Expand Down Expand Up @@ -215,6 +217,9 @@ type Server struct {
registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
// updateServicePrimaryAddrCh is used to notify the server to update the service primary address.
// Note: it is only used in API service mode.
updateServicePrimaryAddrCh chan struct{}
}

// HandlerBuilder builds a server HTTP handler.
Expand Down Expand Up @@ -560,7 +565,7 @@ func (s *Server) startServerLoop(ctx context.Context) {
go s.encryptionKeyManagerLoop()
if s.IsAPIServiceMode() { // disable tso service in api server
s.serverLoopWg.Add(1)
go s.watchServicePrimaryAddrLoop(mcs.TSOServiceName)
go s.startWatchServicePrimaryAddrLoop(mcs.TSOServiceName)
}
}

Expand Down Expand Up @@ -1714,43 +1719,92 @@ func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string)
return "", false
}

func (s *Server) watchServicePrimaryAddrLoop(serviceName string) {
// startWatchServicePrimaryAddrLoop starts a loop to watch the primary address of a given service.
func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()

serviceKey := fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary")
log.Info("start to watch", zap.String("service-key", serviceKey))

primary := &tsopb.Participant{}
ok, rev, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary)
if err != nil {
log.Error("get service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err))
s.updateServicePrimaryAddrCh = make(chan struct{}, 1)
serviceKey := s.servicePrimaryKey(serviceName)
var (
revision int64
err error
)
for i := 0; i < maxRetryTimesGetServicePrimary; i++ {
revision, err = s.updateServicePrimaryAddr(serviceName)
if revision != 0 && err == nil { // update success
break
}
select {
case <-ctx.Done():
return
case <-time.After(retryIntervalGetServicePrimary):
}
}
listenUrls := primary.GetListenUrls()
if ok && len(listenUrls) > 0 {
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
} else {
log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey))
if err != nil {
log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey), zap.Error(err))
}

watchChan := s.client.Watch(ctx, serviceKey, clientv3.WithPrefix(), clientv3.WithRev(rev))
log.Info("start to watch service primary addr", zap.String("service-key", serviceKey))
for {
select {
case <-ctx.Done():
log.Info("server is closed, exist watch service primary addr loop", zap.String("service", serviceName))
return
case res := <-watchChan:
for _, event := range res.Events {
default:
}
nextRevision, err := s.watchServicePrimaryAddr(ctx, serviceName, revision)
if err != nil {
log.Error("watcher canceled unexpectedly and a new watcher will start after a while",
zap.Int64("next-revision", nextRevision),
zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)),
zap.Error(err))
revision = nextRevision
time.Sleep(watchEtcdChangeRetryInterval)
}
}
}

// watchServicePrimaryAddr watches the primary address on etcd.
func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string, revision int64) (nextRevision int64, err error) {
serviceKey := s.servicePrimaryKey(serviceName)
watcher := clientv3.NewWatcher(s.client)
defer watcher.Close()

for {
WatchChan:
watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithRev(revision))
select {
case <-ctx.Done():
return revision, nil
case <-s.updateServicePrimaryAddrCh:
revision, err = s.updateServicePrimaryAddr(serviceName)
if err != nil {
log.Warn("update service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err))
}
goto WatchChan
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
goto WatchChan
}
if wresp.Err() != nil {
log.Error("watcher is canceled with",
zap.Int64("revision", revision),
errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err()))
return revision, wresp.Err()
}
for _, event := range wresp.Events {
switch event.Type {
case clientv3.EventTypePut:
primary.ListenUrls = nil // reset the field
primary := &tsopb.Participant{}
if err := proto.Unmarshal(event.Kv.Value, primary); err != nil {
log.Error("watch service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err))
} else {
listenUrls = primary.GetListenUrls()
listenUrls := primary.GetListenUrls()
if len(listenUrls) > 0 {
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
Expand All @@ -1759,13 +1813,34 @@ func (s *Server) watchServicePrimaryAddrLoop(serviceName string) {
}
}
case clientv3.EventTypeDelete:
log.Warn("service primary is deleted", zap.String("service-key", serviceKey))
s.servicePrimaryMap.Delete(serviceName)
}
}
revision = wresp.Header.Revision
}
}
}

// updateServicePrimaryAddr updates the primary address from etcd with get operation.
func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int64, err error) {
serviceKey := s.servicePrimaryKey(serviceName)
primary := &tsopb.Participant{}
ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary)
listenUrls := primary.GetListenUrls()
if !ok || err != nil || len(listenUrls) == 0 {
return 0, err
}
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0]))
return revision, nil
}

func (s *Server) servicePrimaryKey(serviceName string) string {
return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary")
}

// RecoverAllocID recover alloc id. set current base id to input id
func (s *Server) RecoverAllocID(ctx context.Context, id uint64) error {
return s.idAllocator.SetBase(id)
Expand Down
3 changes: 2 additions & 1 deletion tests/integrations/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {
re.Equal(addr, returnedEntry.ServiceAddr)

// test primary when only one server
expectedPrimary := mcs.WaitForPrimaryServing(suite.Require(), map[string]bs.Server{addr: s})
primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName)
re.True(exist)
re.Equal(primary, addr)
re.Equal(primary, expectedPrimary)

// test API server discovery after unregister
cleanup()
Expand Down

0 comments on commit b9a03d2

Please sign in to comment.