Skip to content

Commit

Permalink
net/raft: save snapshot position first
Browse files Browse the repository at this point in the history
Save the snapshot position to the WAL before saving the snapshot data to
disk. On boot, we open the WAL at the most recently saved snapshot's
position. If we don't save the position to the WAL first, we might open
the WAL at a snapshot position that was never saved to the WAL.

We might want to wait until the CoreOS folks confirm this in
etcd-io/etcd#8082.
  • Loading branch information
jbowens committed Jun 12, 2017
1 parent 2d7fce5 commit 0f56c2a
Showing 1 changed file with 40 additions and 23 deletions.
63 changes: 40 additions & 23 deletions net/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type Service struct {
// it is ok to read without keeping startMu locked in
// code paths where Service is known to be initialized.
startMu sync.Mutex
wal *wal.WAL
raftNode raft.Node
id uint64

Expand Down Expand Up @@ -215,14 +216,15 @@ func Start(laddr, dir string, httpClient *http.Client, state State) (*Service, e
sv.mux.HandleFunc("/raft/join", sv.serveJoin)
sv.mux.HandleFunc("/raft/msg", sv.serveMsg)

walobj, err := sv.recover()
var err error
sv.wal, err = sv.recover()
if err != nil {
return nil, err
}
// If there's no WAL, then this is a new node. The caller is responsible
// for calling either Init to initialize a new cluster or Join to join
// an existing cluster.
if walobj == nil {
if sv.wal == nil {
return sv, nil
}

Expand All @@ -241,15 +243,15 @@ func Start(laddr, dir string, httpClient *http.Client, state State) (*Service, e
// sv hasn't escaped yet.
sv.id = id
sv.raftNode = raftNode
sv.startLocked(walobj)
sv.startLocked()

return sv, nil
}

// startLocked begins the raft algorithm. It requires sv.startMu
// to already be locked.
func (sv *Service) startLocked(walobj *wal.WAL) {
go sv.runUpdates(walobj)
func (sv *Service) startLocked() {
go sv.runUpdates()
go runTicks(sv.raftNode)
}

Expand Down Expand Up @@ -296,7 +298,7 @@ func (sv *Service) Init() error {
if err != nil {
return errors.Wrap(err)
}
walobj, err := wal.Create(sv.walDir(), nil)
sv.wal, err = wal.Create(sv.walDir(), nil)
if err != nil {
return errors.Wrap(err)
}
Expand All @@ -311,9 +313,9 @@ func (sv *Service) Init() error {
// each peer (in our case, just this node). We can't campaign until
// this entry is applied, so synchronously apply them before continuing.
rd := <-raftNode.Ready()
sv.runUpdatesReady(rd, walobj, map[string]chan bool{})
sv.runUpdatesReady(rd, sv.wal, map[string]chan bool{})

sv.startLocked(walobj)
sv.startLocked()

// campaign immediately to avoid waiting electionTick ticks in tests
err = raftNode.Campaign(ctx)
Expand Down Expand Up @@ -342,8 +344,9 @@ func (sv *Service) Join(bootURL string) error {
return err
}
sv.id = id
sv.wal = walobj
sv.raftNode = raft.RestartNode(sv.config(id))
sv.startLocked(walobj)
sv.startLocked()
return nil
}

Expand All @@ -361,18 +364,19 @@ func (sv *Service) runUpdatesReady(rd raft.Ready, wal *wal.WAL, writers map[stri
wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
sv.redo(func() error {
return sv.saveSnapshot(&rd.Snapshot)
})
sv.redo(func() error {
// Note: wal.SaveSnapshot saves the snapshot *position*,
// not the actual full snapshot data.
// That happens in sv.saveSnapshot just above.
// (So don't worry, we're not saving it twice.)
// Note: wal.SaveSnapshot saves the snapshot *position* only,
// not the actual full snapshot data. The data is saved below
// in sv.saveSnapshot. The position must be saved to the WAL
// before we try to load the WAL at the snapshot position.
// Writing the snapshot position first guarantees that.
return wal.SaveSnapshot(walpb.Snapshot{
Index: rd.Snapshot.Metadata.Index,
Term: rd.Snapshot.Metadata.Term,
})
})
sv.redo(func() error {
return sv.saveSnapshot(&rd.Snapshot)
})
err := wal.ReleaseLockTo(rd.Snapshot.Metadata.Index)
if err != nil {
panic(err)
Expand Down Expand Up @@ -420,14 +424,14 @@ func replyReadIndex(rdIndices map[string]chan uint64, readStates []raft.ReadStat

// runUpdates runs forever, reading and processing updates from raft
// onto local storage.
func (sv *Service) runUpdates(wal *wal.WAL) {
func (sv *Service) runUpdates() {
rdIndices := make(map[string]chan uint64)
writers := make(map[string]chan bool)
for {
select {
case rd := <-sv.raftNode.Ready():
replyReadIndex(rdIndices, rd.ReadStates)
sv.runUpdatesReady(rd, wal, writers)
sv.runUpdatesReady(rd, sv.wal, writers)
case req := <-sv.rctxReq:
if req.index == nil {
delete(rdIndices, string(req.rctx))
Expand Down Expand Up @@ -734,17 +738,17 @@ func (sv *Service) join(addr, baseURL string) (id uint64, walobj *wal.WAL, err e
}

if !raft.IsEmptySnap(raftSnap) {
err := sv.saveSnapshot(&raftSnap)
if err != nil {
return 0, nil, errors.Wrap(err)
}
err = walobj.SaveSnapshot(walpb.Snapshot{
Index: raftSnap.Metadata.Index,
Term: raftSnap.Metadata.Term,
})
if err != nil {
return 0, nil, errors.Wrap(err)
}
err := sv.saveSnapshot(&raftSnap)
if err != nil {
return 0, nil, errors.Wrap(err)
}
err = sv.state.RestoreSnapshot(raftSnap.Data, raftSnap.Metadata.Index)
if err != nil {
return 0, nil, errors.Wrap(err)
Expand Down Expand Up @@ -928,7 +932,20 @@ func (sv *Service) getSnapshot() *raftpb.Snapshot {

func (sv *Service) triggerSnapshot() error {
snap := sv.getSnapshot()
err := sv.saveSnapshot(snap)

// First, write the index of the snapshot to the WAL. This
// ensures we never try to open the WAL at an index that was
// not saved to the WAL.
// https://github.com/coreos/etcd/issues/8082
err := sv.wal.SaveSnapshot(walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
})
if err != nil {
return errors.Wrap(err)
}

err = sv.saveSnapshot(snap)
if err != nil {
return errors.Wrap(err)
}
Expand Down

0 comments on commit 0f56c2a

Please sign in to comment.