Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

database/raft: save snapshot position first #1310

Merged
merged 3 commits into from
Jun 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions database/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Service struct {
// All client code accesses the cluster state via our Service
// object, which keeps a local, in-memory copy of the
// complete current state.
wal *wal.WAL
raftNode raft.Node
raftStorage *raft.MemoryStorage

Expand Down Expand Up @@ -177,19 +178,20 @@ func Start(laddr, dir, bootURL string, httpClient *http.Client, useTLS bool) (*S
sv.mux.HandleFunc("/raft/join", sv.serveJoin)
sv.mux.HandleFunc("/raft/msg", sv.serveMsg)

walobj, err := sv.recover()
var err error
sv.wal, err = sv.recover()
if err != nil {
return nil, err
}

recover := walobj != nil
recover := sv.wal != nil
if recover {
sv.id, err = readID(sv.dir)
if err != nil {
return nil, errors.Wrap(err)
}
} else if bootURL != "" {
walobj, err = sv.join(laddr, bootURL) // sets sv.id and state
sv.wal, err = sv.join(laddr, bootURL) // sets sv.id and state
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -225,14 +227,14 @@ func Start(laddr, dir, bootURL string, httpClient *http.Client, useTLS bool) (*S
if err != nil {
return nil, errors.Wrap(err)
}
walobj, err = wal.Create(sv.walDir(), nil)
sv.wal, err = wal.Create(sv.walDir(), nil)
if err != nil {
return nil, errors.Wrap(err)
}
sv.raftNode = raft.StartNode(c, []raft.Peer{{ID: 1, Context: []byte(laddr)}})
}

go sv.runUpdates(walobj)
go sv.runUpdates(sv.wal)
go runTicks(sv.raftNode)

return sv, nil
Expand Down Expand Up @@ -262,18 +264,19 @@ func (sv *Service) runUpdatesReady(rd raft.Ready, wal *wal.WAL, writers map[stri
wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
sv.redo(func() error {
return sv.saveSnapshot(&rd.Snapshot)
})
sv.redo(func() error {
// Note: wal.SaveSnapshot saves the snapshot *position*,
// not the actual full snapshot data.
// That happens in sv.saveSnapshot just above.
// (So don't worry, we're not saving it twice.)
// Note: wal.SaveSnapshot saves the snapshot *position* only,
// not the actual full snapshot data. The data is saved below
// in sv.saveSnapshot. The position must be saved to the WAL
// before we try to load the WAL at the snapshot position.
// Writing the snapshot position first guarantees that.
return wal.SaveSnapshot(walpb.Snapshot{
Index: rd.Snapshot.Metadata.Index,
Term: rd.Snapshot.Metadata.Term,
})
})
sv.redo(func() error {
return sv.saveSnapshot(&rd.Snapshot)
})
err := wal.ReleaseLockTo(rd.Snapshot.Metadata.Index)
if err != nil {
panic(err)
Expand Down Expand Up @@ -663,17 +666,17 @@ func (sv *Service) join(addr, baseURL string) (*wal.WAL, error) {
}

if !raft.IsEmptySnap(raftSnap) {
err := sv.saveSnapshot(&raftSnap)
if err != nil {
return nil, errors.Wrap(err)
}
err = wal.SaveSnapshot(walpb.Snapshot{
err := wal.SaveSnapshot(walpb.Snapshot{
Index: raftSnap.Metadata.Index,
Term: raftSnap.Metadata.Term,
})
if err != nil {
return nil, errors.Wrap(err)
}
err = sv.saveSnapshot(&raftSnap)
if err != nil {
return nil, errors.Wrap(err)
}
err = sv.state.RestoreSnapshot(raftSnap.Data, raftSnap.Metadata.Index)
if err != nil {
return nil, errors.Wrap(err)
Expand Down Expand Up @@ -868,7 +871,20 @@ func (sv *Service) getSnapshot() *raftpb.Snapshot {

func (sv *Service) triggerSnapshot() error {
snap := sv.getSnapshot()
err := sv.saveSnapshot(snap)

// First, write the index of the snapshot to the WAL. This
// ensures we never try to open the WAL at an index that was
// not saved to the WAL.
// https://github.com/coreos/etcd/issues/8082
err := sv.wal.SaveSnapshot(walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
})
if err != nil {
return errors.Wrap(err)
}

err = sv.saveSnapshot(snap)
if err != nil {
return errors.Wrap(err)
}
Expand Down
2 changes: 1 addition & 1 deletion generated/rev/RevId.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

public final class RevId {
public final String Id = "1.2-stable/rev3137";
public final String Id = "1.2-stable/rev3138";
}
2 changes: 1 addition & 1 deletion generated/rev/revid.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package rev

const ID string = "1.2-stable/rev3137"
const ID string = "1.2-stable/rev3138"
2 changes: 1 addition & 1 deletion generated/rev/revid.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

export const rev_id = "1.2-stable/rev3137"
export const rev_id = "1.2-stable/rev3138"
2 changes: 1 addition & 1 deletion generated/rev/revid.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

module Chain::Rev
ID = "1.2-stable/rev3137".freeze
ID = "1.2-stable/rev3138".freeze
end