From da43376badee1063d4c6ca0ce56a39ffef69aeb7 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 4 Apr 2023 17:52:58 +0800 Subject: [PATCH] server: fix watch keyspace revision (#6251) ref tikv/pd#5895 Signed-off-by: Ryan Leung Co-authored-by: Ti Chi Robot --- server/keyspace_service.go | 52 +++++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/server/keyspace_service.go b/server/keyspace_service.go index 5255d725815..64e646119ed 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -22,9 +22,12 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/storage/endpoint" "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" ) // KeyspaceServer wraps GrpcServer to provide keyspace service. @@ -82,18 +85,32 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques ctx, cancel := context.WithCancel(s.Context()) defer cancel() - err := s.sendAllKeyspaceMeta(ctx, stream) + revision, err := s.sendAllKeyspaceMeta(ctx, stream) if err != nil { return err } - watchChan := s.client.Watch(ctx, path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()), clientv3.WithPrefix()) + + watcher := clientv3.NewWatcher(s.client) + defer watcher.Close() + for { - select { - case <-ctx.Done(): - return nil - case res := <-watchChan: - keyspaces := make([]*keyspacepb.KeyspaceMeta, 0, len(res.Events)) - for _, event := range res.Events { + rch := watcher.Watch(ctx, path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()), clientv3.WithPrefix(), clientv3.WithRev(revision)) + for wresp := range rch { + if wresp.CompactRevision != 0 { + log.Warn("required revision has been compacted, use the compact revision", + zap.Int64("required-revision", revision), + zap.Int64("compact-revision", wresp.CompactRevision)) + revision = wresp.CompactRevision + break + } + if wresp.Canceled { + log.Error("watcher is canceled with", + zap.Int64("revision", revision), + errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err())) + return wresp.Err() + } + keyspaces := make([]*keyspacepb.KeyspaceMeta, 0, len(wresp.Events)) + for _, event := range wresp.Events { if event.Type != clientv3.EventTypePut { continue } @@ -109,23 +126,34 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques } } } + select { + case <-ctx.Done(): + // server closed, return + return nil + default: + } } } -func (s *KeyspaceServer) sendAllKeyspaceMeta(ctx context.Context, stream keyspacepb.Keyspace_WatchKeyspacesServer) error { +func (s *KeyspaceServer) sendAllKeyspaceMeta(ctx context.Context, stream keyspacepb.Keyspace_WatchKeyspacesServer) (int64, error) { getResp, err := s.client.Get(ctx, path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()), clientv3.WithPrefix()) if err != nil { - return err + return 0, err } metas := make([]*keyspacepb.KeyspaceMeta, getResp.Count) for i, kv := range getResp.Kvs { meta := &keyspacepb.KeyspaceMeta{} if err = proto.Unmarshal(kv.Value, meta); err != nil { - return err + return 0, err } metas[i] = meta } - return stream.Send(&keyspacepb.WatchKeyspacesResponse{Header: s.header(), Keyspaces: metas}) + var revision int64 + if getResp.Header != nil { + // start from the next revision + revision = getResp.Header.GetRevision() + 1 + } + return revision, stream.Send(&keyspacepb.WatchKeyspacesResponse{Header: s.header(), Keyspaces: metas}) } // UpdateKeyspaceState updates the state of keyspace specified in the request.