From 2b391225da65f60a4288b62152376decc7a8d613 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 22 Nov 2017 14:57:07 -0800 Subject: [PATCH] etcdserver: CheckInitialHashKV when "InitialCorruptCheck==true" Signed-off-by: Gyu-Ho Lee --- etcdserver/corrupt.go | 98 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 80 insertions(+), 18 deletions(-) diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 035c62f041c7..616e911883f4 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -16,14 +16,56 @@ package etcdserver import ( "context" + "fmt" "time" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/pkg/types" ) +// CheckInitialHashKV compares initial hash values with its peers +// before serving any peer/client traffic. +func (s *EtcdServer) CheckInitialHashKV() error { + if !s.Cfg.InitialCorruptCheck { + return nil + } + + plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout()) + h, rev, _, err := s.kv.HashByRev(0) + if err != nil { + return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err) + } + peers := s.getPeerHashKVs(rev) + mismatch := 0 + for _, p := range peers { + if p.resp != nil { + peerID := types.ID(p.resp.Header.MemberId) + if h != p.resp.Hash && rev == p.resp.Header.Revision { + plog.Errorf("%s's hash %d != %s's hash %d (revision %d)", s.ID(), h, peerID, p.resp.Hash, rev) + mismatch++ + } + continue + } + if p.err != nil { + switch p.err { + case rpctypes.ErrFutureRev: + plog.Errorf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) + case rpctypes.ErrCompacted: + plog.Errorf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) + } + } + } + if mismatch > 0 { + return fmt.Errorf("%s found data inconsistency with peers", s.ID()) + } + + plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID()) + return nil +} + func (s *EtcdServer) monitorKVHash() { t := s.Cfg.CorruptCheckTime if t == 0 { @@ -50,7 +92,7 @@ func (s *EtcdServer) checkHashKV() error { if err != nil { plog.Fatalf("failed to hash kv store (%v)", err) } - resps := s.getPeerHashKVs(rev) + peers := s.getPeerHashKVs(rev) ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) err = s.linearizableReadNotify(ctx) @@ -86,24 +128,27 @@ func (s *EtcdServer) checkHashKV() error { mismatch(uint64(s.ID())) } - for _, resp := range resps { - id := resp.Header.MemberId + for _, p := range peers { + if p.resp == nil { + continue + } + id := p.resp.Header.MemberId // leader expects follower's latest revision less than or equal to leader's - if resp.Header.Revision > rev2 { + if p.resp.Header.Revision > rev2 { plog.Warningf( "revision %d from member %v, expected at most %d", - resp.Header.Revision, + p.resp.Header.Revision, types.ID(id), rev2) mismatch(id) } // leader expects follower's latest compact revision less than or equal to leader's - if resp.CompactRevision > crev2 { + if p.resp.CompactRevision > crev2 { plog.Warningf( "compact revision %d from member %v, expected at most %d", - resp.CompactRevision, + p.resp.CompactRevision, types.ID(id), crev2, ) @@ -111,10 +156,10 @@ func (s *EtcdServer) checkHashKV() error { } // follower's compact revision is leader's old one, then hashes must match - if resp.CompactRevision == crev && resp.Hash != h { + if p.resp.CompactRevision == crev && p.resp.Hash != h { plog.Warningf( "hash %d at revision %d from member %v, expected hash %d", - resp.Hash, + p.resp.Hash, rev, types.ID(id), h, @@ -125,36 +170,53 @@ func (s *EtcdServer) checkHashKV() error { return nil } -func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*clientv3.HashKVResponse) { - for _, m := range s.cluster.Members() { +type peerHashKVResp struct { + resp *clientv3.HashKVResponse + err error + eps []string +} + +func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) { + // TODO: handle the case when "s.cluster.Members" have not + // been populated (e.g. no snapshot to load from disk) + mbs := s.cluster.Members() + pURLs := make([][]string, len(mbs)) + for _, m := range mbs { if m.ID == s.ID() { continue } + pURLs = append(pURLs, m.PeerURLs) + } + for _, purls := range pURLs { + if len(purls) == 0 { + continue + } cli, cerr := clientv3.New(clientv3.Config{ DialTimeout: s.Cfg.ReqTimeout(), - Endpoints: m.PeerURLs, + Endpoints: purls, }) if cerr != nil { - plog.Warningf("%s failed to create client to peer %s for hash checking (%q)", s.ID(), types.ID(m.ID), cerr.Error()) + plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), purls, cerr.Error()) continue } respsLen := len(resps) for _, c := range cli.Endpoints() { ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - resp, herr := cli.HashKV(ctx, c, rev) + var resp *clientv3.HashKVResponse + resp, cerr = cli.HashKV(ctx, c, rev) cancel() - if herr == nil { - cerr = herr - resps = append(resps, resp) + if cerr == nil { + resps = append(resps, &peerHashKVResp{resp: resp}) break } + plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev) } cli.Close() if respsLen == len(resps) { - plog.Warningf("%s failed to hash kv for peer %s (%v)", s.ID(), types.ID(m.ID), cerr) + resps = append(resps, &peerHashKVResp{err: cerr, eps: purls}) } } return resps