Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wal: add Verify function to perform corruption check on wal contents #10603

Merged
merged 2 commits into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 123 additions & 26 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,55 @@ func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) {
}

func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
names, err := readWalNames(dirpath)
names, nameIndex, err := selectWALFiles(dirpath, snap)
if err != nil {
return nil, err
}

rs, ls, closer, err := openWALFiles(dirpath, names, nameIndex, write)
if err != nil {
return nil, err
}

// create a WAL ready for reading
w := &WAL{
dir: dirpath,
start: snap,
decoder: newDecoder(rs...),
readClose: closer,
locks: ls,
}

if write {
// write reuses the file descriptors from read; don't close so
// WAL can append without dropping the file lock
w.readClose = nil
if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil {
closer()
return nil, err
}
w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
}

return w, nil
}

func selectWALFiles(dirpath string, snap walpb.Snapshot) ([]string, int, error) {
names, err := readWalNames(dirpath)
if err != nil {
return nil, -1, err
}

nameIndex, ok := searchIndex(names, snap.Index)
if !ok || !isValidSeq(names[nameIndex:]) {
return nil, ErrFileNotFound
err = ErrFileNotFound
return nil, -1, err
}

// open the wal files
return names, nameIndex, nil
}

func openWALFiles(dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) {
rcs := make([]io.ReadCloser, 0)
rs := make([]io.Reader, 0)
ls := make([]*fileutil.LockedFile, 0)
Expand All @@ -243,15 +281,15 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, err
return nil, nil, nil, err
}
ls = append(ls, l)
rcs = append(rcs, l)
} else {
rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, err
return nil, nil, nil, err
}
ls = append(ls, nil)
rcs = append(rcs, rf)
Expand All @@ -261,27 +299,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)

closer := func() error { return closeAll(rcs...) }

// create a WAL ready for reading
w := &WAL{
dir: dirpath,
start: snap,
decoder: newDecoder(rs...),
readClose: closer,
locks: ls,
}

if write {
// write reuses the file descriptors from read; don't close so
// WAL can append without dropping the file lock
w.readClose = nil
if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil {
closer()
return nil, err
}
w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
}

return w, nil
return rs, ls, closer, nil
}

// ReadAll reads out records of the current WAL.
Expand Down Expand Up @@ -398,6 +416,85 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
return metadata, state, ents, err
}

// Verify reads through the given WAL and verifies that it is not corrupted.
// It creates a new decoder to read through the records of the given WAL.
// It does not conflict with any open WAL, but it is recommended not to
// call this function after opening the WAL for writing.
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
// If the loaded snap doesn't match with the expected one, it will
// return error ErrSnapshotMismatch.
func Verify(walDir string, snap walpb.Snapshot) error {
var metadata []byte
var err error
var match bool

rec := &walpb.Record{}

names, nameIndex, err := selectWALFiles(walDir, snap)
if err != nil {
return err
}

// open wal files in read mode, so that there is no conflict
// when the same WAL is opened elsewhere in write mode
rs, _, closer, err := openWALFiles(walDir, names, nameIndex, false)
if err != nil {
return err
}

// create a new decoder from the readers on the WAL files
decoder := newDecoder(rs...)

for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
switch rec.Type {
case metadataType:
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
return ErrMetadataConflict
}
metadata = rec.Data
case crcType:
crc := decoder.crc.Sum32()
// Current crc of decoder must match the crc of the record.
// We need not match 0 crc, since the decoder is a new one at this point.
if crc != 0 && rec.Validate(crc) != nil {
return ErrCRCMismatch
}
decoder.updateCRC(rec.Crc)
case snapshotType:
var loadedSnap walpb.Snapshot
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
if loadedSnap.Index == snap.Index {
if loadedSnap.Term != snap.Term {
return ErrSnapshotMismatch
}
match = true
}
// We ignore all entry and state type records as these
// are not necessary for validating the WAL contents
case entryType:
case stateType:
default:
return fmt.Errorf("unexpected block type %d", rec.Type)
}
}

if closer != nil {
closer()
}

// We do not have to read out all the WAL entries
// as the decoder is opened in read mode.
if err != io.EOF && err != io.ErrUnexpectedEOF {
return err
}

if !match {
return ErrSnapshotNotFound
}

return nil
}

// cut closes current file written and creates a new one ready to append.
// cut first creates a temp wal file and writes necessary headers into it.
// Then cut atomically rename temp wal file to a wal file.
Expand Down
52 changes: 52 additions & 0 deletions wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"testing"
Expand Down Expand Up @@ -150,6 +151,57 @@ func TestOpenAtIndex(t *testing.T) {
}
}

// TestVerify tests that Verify throws a non-nil error when the WAL is corrupted.
// The test creates a WAL directory and cuts out multiple WAL files. Then
// it corrupts one of the files by completely truncating it.
func TestVerify(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(walDir)

// create WAL
w, err := Create(walDir, nil)
if err != nil {
t.Fatal(err)
}
defer w.Close()

// make 5 separate files
for i := 0; i < 5; i++ {
es := []raftpb.Entry{{Index: uint64(i), Data: []byte("waldata" + string(i+1))}}
if err = w.Save(raftpb.HardState{}, es); err != nil {
t.Fatal(err)
}
if err = w.cut(); err != nil {
t.Fatal(err)
}
}

// to verify the WAL is not corrupted at this point
err = Verify(walDir, walpb.Snapshot{})
if err != nil {
t.Errorf("expected a nil error, got %v", err)
}

walFiles, err := ioutil.ReadDir(walDir)
if err != nil {
t.Fatal(err)
}

// corrupt the WAL by truncating one of the WAL files completely
err = os.Truncate(path.Join(walDir, walFiles[2].Name()), 0)
if err != nil {
t.Fatal(err)
}

err = Verify(walDir, walpb.Snapshot{})
if err == nil {
t.Error("expected a non-nil error, got nil")
}
}

// TODO: split it into smaller tests for better readability
func TestCut(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
Expand Down