Skip to content

Commit

Permalink
add update cache force
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Apr 7, 2023
1 parent 2c43e7a commit 4cd6edd
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 28 deletions.
12 changes: 9 additions & 3 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tsoutil

import (
"context"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -57,20 +58,20 @@ func NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize prometheus.Histo

// DispatchRequest is the entry point for dispatching/forwarding a tso request to the detination host
func (s *TSODispatcher) DispatchRequest(
ctx context.Context, req Request, tsoProtoFactory ProtoFactory, doneCh <-chan struct{}, errCh chan<- error) {
ctx context.Context, req Request, tsoProtoFactory ProtoFactory, doneCh <-chan struct{}, errCh chan<- error, updateServicePrimaryAddrChs ...chan<- struct{}) {
val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests))
reqCh := val.(chan Request)
if !loaded {
tsDeadlineCh := make(chan deadline, 1)
go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh)
go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh, updateServicePrimaryAddrChs...)
go watchTSDeadline(ctx, tsDeadlineCh)
}
reqCh <- req
}

func (s *TSODispatcher) dispatch(
ctx context.Context, tsoProtoFactory ProtoFactory, forwardedHost string, clientConn *grpc.ClientConn,
tsoRequestCh <-chan Request, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) {
tsoRequestCh <-chan Request, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error, updateServicePrimaryAddrChs ...chan<- struct{}) {
dispatcherCtx, ctxCancel := context.WithCancel(ctx)
defer ctxCancel()
defer s.dispatchChs.Delete(forwardedHost)
Expand Down Expand Up @@ -121,6 +122,11 @@ func (s *TSODispatcher) dispatch(
log.Error("proxy forward tso error",
zap.String("forwarded-host", forwardedHost),
errs.ZapError(errs.ErrGRPCSend, err))
if len(updateServicePrimaryAddrChs) > 0 {
if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) {
updateServicePrimaryAddrChs[0] <- struct{}{}
}
}
select {
case <-dispatcherCtx.Done():
return
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
}

tsoRequest := tsoutil.NewPDProtoRequest(forwardedHost, clientConn, request, stream)
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh)
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh, s.updateServicePrimaryAddrCh)
continue
}

Expand Down
67 changes: 43 additions & 24 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,10 @@ type Server struct {

auditBackends []audit.Backend

registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
updateServicePrimaryAddrCh chan struct{}
}

// HandlerBuilder builds a server HTTP handler.
Expand Down Expand Up @@ -249,6 +250,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le
}{
clients: make(map[string]tsopb.TSO_TsoClient),
},
updateServicePrimaryAddrCh: make(chan struct{}, 1),
}
s.handler = newHandler(s)

Expand Down Expand Up @@ -1726,32 +1728,25 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) {
defer cancel()

serviceKey := s.servicePrimaryKey(serviceName)
var revision int64
var (
revision int64
err error
)
for i := 0; i < maxRetryTimesGetServicePrimary; i++ {
select {
case <-ctx.Done():
return
case <-time.After(retryIntervalGetServicePrimary):
}
primary := &tsopb.Participant{}
ok, r, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary)
if err != nil {
log.Error("get service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err))
continue
}
listenUrls := primary.GetListenUrls()
if ok && len(listenUrls) > 0 {
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
revision = r
log.Info("get service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0]))
revision, err = s.updateServicePrimaryAddr(serviceName)
if err == nil {
break
}
}
if revision == 0 {
log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey))
}
log.Info("start to watch", zap.String("service-key", serviceKey))
log.Info("start to watch service primary addr", zap.String("service-key", serviceKey))
for {
select {
case <-ctx.Done():
Expand All @@ -1771,20 +1766,49 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) {
}
}

// SetServicePrimaryAddr sets the primary address directly.
// Note: This function is only used for test.
func (s *Server) SetServicePrimaryAddr(serviceName, addr string) {
s.servicePrimaryMap.Store(serviceName, addr)
}

// updateServicePrimaryAddr updates the primary address from etcd with get operation.
func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int64, err error) {
serviceKey := s.servicePrimaryKey(serviceName)
primary := &tsopb.Participant{}
ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary)
listenUrls := primary.GetListenUrls()
if !ok || err != nil || len(listenUrls) == 0 {
return 0, err
}
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0]))
return revision, nil
}

// watchServicePrimaryAddr watches the primary address on etcd.
func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string, revision int64) (nextRevision int64, err error) {
serviceKey := s.servicePrimaryKey(serviceName)
watcher := clientv3.NewWatcher(s.client)
defer watcher.Close()

for {
WatchChan:
watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithPrefix(), clientv3.WithRev(revision))
for wresp := range watchChan {
select {
case <-ctx.Done():
return revision, nil
case <-s.updateServicePrimaryAddrCh:
revision, err = s.updateServicePrimaryAddr(serviceName)
goto WatchChan
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
break
goto WatchChan
}
if wresp.Err() != nil {
log.Error("watcher is canceled with",
Expand Down Expand Up @@ -1814,11 +1838,6 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string
}
revision = wresp.Header.Revision
}
select {
case <-ctx.Done():
return revision, nil
default:
}
}
}

Expand Down

0 comments on commit 4cd6edd

Please sign in to comment.