Skip to content

Commit

Permalink
server: Written Snapshot's to WAL contains populated ConfState.
Browse files Browse the repository at this point in the history
This will (among others) allow etcd-3.6 to not depend on store_v2 .snap files at all,
as WAL + db file will be self-sufficient.
  • Loading branch information
ptabor committed Mar 4, 2021
1 parent 6dd638d commit 917895e
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 46 deletions.
2 changes: 1 addition & 1 deletion etcdctl/ctlv2/command/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) {
log.Fatal(err)
}
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState
newss := snap.New(zap.NewExample(), destSnap)
if err = newss.SaveSnap(*snapshot); err != nil {
log.Fatal(err)
Expand Down
15 changes: 9 additions & 6 deletions etcdctl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ func (s *v3Manager) saveDB() error {
}

// saveWALAndSnap creates a WAL for the initial cluster
//
// TODO: This code ignores learners !!!
func (s *v3Manager) saveWALAndSnap() error {
if err := fileutil.CreateDirAll(s.walDir); err != nil {
return err
Expand Down Expand Up @@ -453,19 +455,20 @@ func (s *v3Manager) saveWALAndSnap() error {
if berr != nil {
return berr
}
confState := raftpb.ConfState{
Voters: nodeIDs,
}
raftSnap := raftpb.Snapshot{
Data: b,
Metadata: raftpb.SnapshotMetadata{
Index: commit,
Term: term,
ConfState: raftpb.ConfState{
Voters: nodeIDs,
},
Index: commit,
Term: term,
ConfState: confState,
},
}
sn := snap.New(s.lg, s.snapDir)
if err := sn.SaveSnap(raftSnap); err != nil {
return err
}
return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term})
return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term, ConfState: &confState})
}
5 changes: 3 additions & 2 deletions server/etcdserver/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
ConfState: &snap.Metadata.ConfState,
}
// save the snapshot file before writing the snapshot to the wal.
// This makes it possible for the snapshot file to become orphaned, but prevents
Expand Down
5 changes: 4 additions & 1 deletion server/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,6 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
}
}
snaps = snaps[:n:n]

return snaps, nil
}

Expand Down Expand Up @@ -942,6 +941,10 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
}

func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
if err := walpb.ValidateSnapshotForWrite(&e); err != nil {
return err
}

b := pbutil.MustMarshal(&e)

w.mu.Lock()
Expand Down
79 changes: 43 additions & 36 deletions server/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ import (
"go.uber.org/zap"
)

var (
confState = raftpb.ConfState{
Voters: []uint64{0x00ffca74},
AutoLeave: false,
}
)

func TestNew(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -90,7 +97,7 @@ func TestNew(t *testing.T) {
}

func TestCreateFailFromPollutedDir(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand All @@ -104,7 +111,7 @@ func TestCreateFailFromPollutedDir(t *testing.T) {
}

func TestWalCleanup(t *testing.T) {
testRoot, err := ioutil.TempDir(os.TempDir(), "waltestroot")
testRoot, err := ioutil.TempDir(t.TempDir(), "waltestroot")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -135,7 +142,7 @@ func TestWalCleanup(t *testing.T) {
}

func TestCreateFailFromNoSpaceLeft(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand All @@ -154,7 +161,7 @@ func TestCreateFailFromNoSpaceLeft(t *testing.T) {
}

func TestNewForInitedDir(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand All @@ -167,7 +174,7 @@ func TestNewForInitedDir(t *testing.T) {
}

func TestOpenAtIndex(t *testing.T) {
dir, err := ioutil.TempDir(os.TempDir(), "waltest")
dir, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -210,7 +217,7 @@ func TestOpenAtIndex(t *testing.T) {
}
w.Close()

emptydir, err := ioutil.TempDir(os.TempDir(), "waltestempty")
emptydir, err := ioutil.TempDir(t.TempDir(), "waltestempty")
if err != nil {
t.Fatal(err)
}
Expand All @@ -224,7 +231,7 @@ func TestOpenAtIndex(t *testing.T) {
// The test creates a WAL directory and cuts out multiple WAL files. Then
// it corrupts one of the files by completely truncating it.
func TestVerify(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "waltest")
walDir, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -273,7 +280,7 @@ func TestVerify(t *testing.T) {

// TODO: split it into smaller tests for better readability
func TestCut(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -304,7 +311,7 @@ func TestCut(t *testing.T) {
if err = w.cut(); err != nil {
t.Fatal(err)
}
snap := walpb.Snapshot{Index: 2, Term: 1}
snap := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
if err = w.SaveSnapshot(snap); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -335,7 +342,7 @@ func TestCut(t *testing.T) {
}

func TestSaveWithCut(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -398,7 +405,7 @@ func TestSaveWithCut(t *testing.T) {
}

func TestRecover(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -513,7 +520,7 @@ func TestScanWalName(t *testing.T) {
}

func TestRecoverAfterCut(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand All @@ -524,7 +531,7 @@ func TestRecoverAfterCut(t *testing.T) {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i), Term: 1, ConfState: &confState}); err != nil {
t.Fatal(err)
}
es := []raftpb.Entry{{Index: uint64(i)}}
Expand All @@ -542,7 +549,7 @@ func TestRecoverAfterCut(t *testing.T) {
}

for i := 0; i < 10; i++ {
w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i)})
w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i), Term: 1})
if err != nil {
if i <= 4 {
if err != ErrFileNotFound {
Expand Down Expand Up @@ -571,7 +578,7 @@ func TestRecoverAfterCut(t *testing.T) {
}

func TestOpenAtUncommittedIndex(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -605,7 +612,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
// it releases the lock of part of data, and excepts that OpenForRead
// can read out all files even if some are locked for write.
func TestOpenForRead(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -646,7 +653,7 @@ func TestOpenForRead(t *testing.T) {
}

func TestOpenWithMaxIndex(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -689,7 +696,7 @@ func TestSaveEmpty(t *testing.T) {
}

func TestReleaseLockTo(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -761,7 +768,7 @@ func TestReleaseLockTo(t *testing.T) {

// TestTailWriteNoSlackSpace ensures that tail writes append if there's no preallocated space.
func TestTailWriteNoSlackSpace(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -827,7 +834,7 @@ func TestTailWriteNoSlackSpace(t *testing.T) {

// TestRestartCreateWal ensures that an interrupted WAL initialization is clobbered on restart
func TestRestartCreateWal(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -867,7 +874,7 @@ func TestOpenOnTornWrite(t *testing.T) {
clobberIdx := 20
overwriteEntries := 5

p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -952,7 +959,7 @@ func TestOpenOnTornWrite(t *testing.T) {
}

func TestRenameFail(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand All @@ -964,7 +971,7 @@ func TestRenameFail(t *testing.T) {
}()
SegmentSizeBytes = math.MaxInt64

tp, terr := ioutil.TempDir(os.TempDir(), "waltest")
tp, terr := ioutil.TempDir(t.TempDir(), "waltest")
if terr != nil {
t.Fatal(terr)
}
Expand All @@ -982,7 +989,7 @@ func TestRenameFail(t *testing.T) {

// TestReadAllFail ensure ReadAll error if used without opening the WAL
func TestReadAllFail(t *testing.T) {
dir, err := ioutil.TempDir(os.TempDir(), "waltest")
dir, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
Expand All @@ -1004,18 +1011,18 @@ func TestReadAllFail(t *testing.T) {
// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting
// for hardstate
func TestValidSnapshotEntries(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
snap0 := walpb.Snapshot{Index: 0, Term: 0}
snap1 := walpb.Snapshot{Index: 1, Term: 1}
snap0 := walpb.Snapshot{}
snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState}
state1 := raftpb.HardState{Commit: 1, Term: 1}
snap2 := walpb.Snapshot{Index: 2, Term: 1}
snap3 := walpb.Snapshot{Index: 3, Term: 2}
snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState}
state2 := raftpb.HardState{Commit: 3, Term: 2}
snap4 := walpb.Snapshot{Index: 4, Term: 2} // will be orphaned since the last committed entry will be snap3
snap4 := walpb.Snapshot{Index: 4, Term: 2, ConfState: &confState} // will be orphaned since the last committed entry will be snap3
func() {
w, err := Create(zap.NewExample(), p, nil)
if err != nil {
Expand Down Expand Up @@ -1061,16 +1068,16 @@ func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) {
defer func() {
SegmentSizeBytes = oldSegmentSizeBytes
}()
p, err := ioutil.TempDir(os.TempDir(), "waltest")
p, err := ioutil.TempDir(t.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
snap0 := walpb.Snapshot{Index: 0, Term: 0}
snap1 := walpb.Snapshot{Index: 1, Term: 1}
snap0 := walpb.Snapshot{}
snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState}
state1 := raftpb.HardState{Commit: 1, Term: 1}
snap2 := walpb.Snapshot{Index: 2, Term: 1}
snap3 := walpb.Snapshot{Index: 3, Term: 2}
snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState}
state2 := raftpb.HardState{Commit: 3, Term: 2}
func() {
w, err := Create(zap.NewExample(), p, nil)
Expand Down
12 changes: 12 additions & 0 deletions server/wal/walpb/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,15 @@ func (rec *Record) Validate(crc uint32) error {
rec.Reset()
return ErrCRCMismatch
}

// ValidateSnapshotForWrite ensures the Snapshot the newly written snapshot is valid.
//
// There might exist log-entries written by old etcd versions that does not conform
// to the requirements.
func ValidateSnapshotForWrite(e *Snapshot) error {
// Since etcd>=3.5.0
if e.ConfState == nil && e.Index > 0 {
return errors.New("Saved (not-initial) snapshot is missing ConfState: " + e.String())
}
return nil
}
Loading

0 comments on commit 917895e

Please sign in to comment.