Skip to content

Commit

Permalink
server: fix watch keyspace revision (#6251)
Browse files Browse the repository at this point in the history
ref #5895

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
rleungx and ti-chi-bot committed Apr 4, 2023
1 parent f448b61 commit da43376
Showing 1 changed file with 40 additions and 12 deletions.
52 changes: 40 additions & 12 deletions server/keyspace_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/storage/endpoint"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// KeyspaceServer wraps GrpcServer to provide keyspace service.
Expand Down Expand Up @@ -82,18 +85,32 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques
ctx, cancel := context.WithCancel(s.Context())
defer cancel()

err := s.sendAllKeyspaceMeta(ctx, stream)
revision, err := s.sendAllKeyspaceMeta(ctx, stream)
if err != nil {
return err
}
watchChan := s.client.Watch(ctx, path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()), clientv3.WithPrefix())

watcher := clientv3.NewWatcher(s.client)
defer watcher.Close()

for {
select {
case <-ctx.Done():
return nil
case res := <-watchChan:
keyspaces := make([]*keyspacepb.KeyspaceMeta, 0, len(res.Events))
for _, event := range res.Events {
rch := watcher.Watch(ctx, path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()), clientv3.WithPrefix(), clientv3.WithRev(revision))
for wresp := range rch {
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
break
}
if wresp.Canceled {
log.Error("watcher is canceled with",
zap.Int64("revision", revision),
errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err()))
return wresp.Err()
}
keyspaces := make([]*keyspacepb.KeyspaceMeta, 0, len(wresp.Events))
for _, event := range wresp.Events {
if event.Type != clientv3.EventTypePut {
continue
}
Expand All @@ -109,23 +126,34 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques
}
}
}
select {
case <-ctx.Done():
// server closed, return
return nil
default:
}
}
}

func (s *KeyspaceServer) sendAllKeyspaceMeta(ctx context.Context, stream keyspacepb.Keyspace_WatchKeyspacesServer) error {
func (s *KeyspaceServer) sendAllKeyspaceMeta(ctx context.Context, stream keyspacepb.Keyspace_WatchKeyspacesServer) (int64, error) {
getResp, err := s.client.Get(ctx, path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()), clientv3.WithPrefix())
if err != nil {
return err
return 0, err
}
metas := make([]*keyspacepb.KeyspaceMeta, getResp.Count)
for i, kv := range getResp.Kvs {
meta := &keyspacepb.KeyspaceMeta{}
if err = proto.Unmarshal(kv.Value, meta); err != nil {
return err
return 0, err
}
metas[i] = meta
}
return stream.Send(&keyspacepb.WatchKeyspacesResponse{Header: s.header(), Keyspaces: metas})
var revision int64
if getResp.Header != nil {
// start from the next revision
revision = getResp.Header.GetRevision() + 1
}
return revision, stream.Send(&keyspacepb.WatchKeyspacesResponse{Header: s.header(), Keyspaces: metas})
}

// UpdateKeyspaceState updates the state of keyspace specified in the request.
Expand Down

0 comments on commit da43376

Please sign in to comment.