diff --git a/etcdserver/raft.go b/etcdserver/raft.go index b87ceea54660..a7867db684d6 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -83,7 +83,8 @@ type RaftTimer interface { type apply struct { entries []raftpb.Entry snapshot raftpb.Snapshot - raftDone <-chan struct{} // rx {} after raft has persisted messages + // notifyc synchronizes etcd server applies with the raft node + notifyc chan struct{} } type raftNode struct { @@ -190,11 +191,11 @@ func (r *raftNode) start(rh *raftReadyHandler) { } } - raftDone := make(chan struct{}, 1) + notifyc := make(chan struct{}, 1) ap := apply{ entries: rd.CommittedEntries, snapshot: rd.Snapshot, - raftDone: raftDone, + notifyc: notifyc, } updateCommittedIndex(&ap, rh) @@ -227,10 +228,14 @@ func (r *raftNode) start(rh *raftReadyHandler) { if err := r.storage.SaveSnap(rd.Snapshot); err != nil { plog.Fatalf("raft save snapshot error: %v", err) } + // etcdserver now claim the snapshot has been persisted onto the disk + notifyc <- struct{}{} + // gofail: var raftAfterSaveSnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) // gofail: var raftAfterApplySnap struct{} + } r.raftStorage.Append(rd.Entries) @@ -240,7 +245,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { msgs := r.processMessages(rd.Messages) // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots - raftDone <- struct{}{} + notifyc <- struct{}{} // Candidate or follower needs to wait for all pending configuration // changes to be applied before sending messages. @@ -259,9 +264,9 @@ func (r *raftNode) start(rh *raftReadyHandler) { if waitApply { // blocks until 'applyAll' calls 'applyWait.Trigger' // to be in sync with scheduled config-change job - // (assume raftDone has cap of 1) + // (assume notifyc has cap of 1) select { - case raftDone <- struct{}{}: + case notifyc <- struct{}{}: case <-r.stopped: return } @@ -271,7 +276,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.transport.Send(msgs) } else { // leader already processed 'MsgSnap' and signaled - raftDone <- struct{}{} + notifyc <- struct{}{} } r.Advance() diff --git a/etcdserver/server.go b/etcdserver/server.go index 33430276bff6..1c2a95c05b58 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -274,20 +274,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { bepath := filepath.Join(cfg.SnapDir(), databaseFilename) beExist := fileutil.Exist(bepath) - var be backend.Backend - beOpened := make(chan struct{}) - go func() { - be = newBackend(bepath, cfg.QuotaBackendBytes) - beOpened <- struct{}{} - }() - - select { - case <-beOpened: - case <-time.After(time.Second): - plog.Warningf("another etcd process is running with the same data dir and holding the file lock.") - plog.Warningf("waiting for it to exit before starting...") - <-beOpened - } + be := openBackend(bepath, cfg.QuotaBackendBytes) defer func() { if err != nil { @@ -385,6 +372,11 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { plog.Panicf("recovered store from snapshot error: %v", err) } plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index) + + be, err = checkAndRecoverDB(snapshot, be, cfg.QuotaBackendBytes, cfg.SnapDir()) + if err != nil { + plog.Panicf("recovering backend from snapshot error: %v", err) + } } cfg.Print() if !cfg.ForceNewCluster { @@ -778,7 +770,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { // wait for the raft routine to finish the disk writes before triggering a // snapshot. or applied index might be greater than the last index in raft // storage, since the raft routine might be slower than apply routine. - <-apply.raftDone + <-apply.notifyc s.triggerSnapshot(ep) select { @@ -803,6 +795,9 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { apply.snapshot.Metadata.Index, ep.appliedi) } + // wait for raftNode to persist snashot onto the disk + <-apply.notifyc + snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index) if err != nil { plog.Panicf("get database snapshot file path error: %v", err) diff --git a/etcdserver/util.go b/etcdserver/util.go index e3896ffc2d3d..12fbe87e126e 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -15,11 +15,19 @@ package etcdserver import ( + "os" "time" + "fmt" + "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" + "github.com/coreos/etcd/snap" ) // isConnectedToQuorumSince checks whether the local member is connected to the @@ -95,3 +103,54 @@ func (nc *notifier) notify(err error) { nc.err = err close(nc.c) } + +// checkAndRecoverDB attempts to recover db in the scenario when +// etcd server crashes before updating its in-state db +// and after persisting snapshot to disk from syncing with leader, +// snapshot can be newer than db where +// (snapshot.Metadata.Index > db.consistentIndex ). +// +// when that happen: +// 1. find xxx.snap.db that matches snap index. +// 2. rename xxx.snap.db to db. +// 3. open the new db as the backend. +func checkAndRecoverDB(snapshot *raftpb.Snapshot, oldbe backend.Backend, quotaBackendBytes int64, snapdir string) (be backend.Backend, err error) { + var cIndex consistentIndex + kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex) + kvindex := kv.ConsistentIndex() + if snapshot.Metadata.Index < kvindex { + return oldbe, nil + } + + id := snapshot.Metadata.Index + snapfn, err := snap.DBFilePathFromID(snapdir, id) + if err != nil { + return nil, fmt.Errorf("finding %v error: %v", snapdir+fmt.Sprintf("%016x.snap.db", id), err) + } + + bepath := snapdir + databaseFilename + if err := os.Rename(snapfn, bepath); err != nil { + return nil, fmt.Errorf("rename snapshot file error: %v", err) + } + + oldbe.Close() + be = openBackend(bepath, quotaBackendBytes) + return be, nil +} + +func openBackend(bepath string, quotaBackendBytes int64) (be backend.Backend) { + beOpened := make(chan struct{}) + go func() { + be = newBackend(bepath, quotaBackendBytes) + beOpened <- struct{}{} + }() + + select { + case <-beOpened: + case <-time.After(time.Second): + plog.Warningf("another etcd process is running with the same data dir and holding the file lock.") + plog.Warningf("waiting for it to exit before starting...") + <-beOpened + } + return be +} diff --git a/snap/db.go b/snap/db.go index ae3c743f80c1..77d109141e90 100644 --- a/snap/db.go +++ b/snap/db.go @@ -15,6 +15,7 @@ package snap import ( + "errors" "fmt" "io" "io/ioutil" @@ -24,6 +25,8 @@ import ( "github.com/coreos/etcd/pkg/fileutil" ) +var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist") + // SaveDBFrom saves snapshot of the database from the given reader. It // guarantees the save operation is atomic. func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) { @@ -60,15 +63,19 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) { // DBFilePath returns the file path for the snapshot of the database with // given id. If the snapshot does not exist, it returns error. func (s *Snapshotter) DBFilePath(id uint64) (string, error) { - fns, err := fileutil.ReadDir(s.dir) + return DBFilePathFromID(s.dir, id) +} + +func DBFilePathFromID(dbPath string, id uint64) (string, error) { + fns, err := fileutil.ReadDir(dbPath) if err != nil { return "", err } wfn := fmt.Sprintf("%016x.snap.db", id) for _, fn := range fns { if fn == wfn { - return filepath.Join(s.dir, fn), nil + return filepath.Join(dbPath, fn), nil } } - return "", fmt.Errorf("snap: snapshot file doesn't exist") + return "", ErrNoDBSnapshot }