Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mvcc: fix TestHashKVWhenCompacting hash mismatch #8333

Merged
merged 4 commits into from
Aug 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type index interface {
Tombstone(key []byte, rev revision) error
RangeSince(key, end []byte, rev int64) []revision
Compact(rev int64) map[revision]struct{}
Keep(rev int64) map[revision]struct{}
Equal(b index) bool

Insert(ki *keyIndex)
Expand Down Expand Up @@ -179,6 +180,19 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
return available
}

// Keep finds all revisions to be kept for a Compaction at the given rev.
func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
ti.RLock()
defer ti.RUnlock()
ti.tree.Ascend(func(i btree.Item) bool {
keyi := i.(*keyIndex)
keyi.keep(rev, available)
return true
})
return available
}

func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
return func(i btree.Item) bool {
keyi := i.(*keyIndex)
Expand Down
16 changes: 11 additions & 5 deletions mvcc/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestIndexRangeSince(t *testing.T) {
}
}

func TestIndexCompact(t *testing.T) {
func TestIndexCompactAndKeep(t *testing.T) {
maxRev := int64(20)
tests := []struct {
key []byte
Expand All @@ -215,7 +215,7 @@ func TestIndexCompact(t *testing.T) {
{[]byte("foo1"), false, revision{10, 1}, revision{10, 1}, 1},
}

// Continuous Compact
// Continuous Compact and Keep
ti := newTreeIndex()
for _, tt := range tests {
if tt.remove {
Expand All @@ -226,7 +226,10 @@ func TestIndexCompact(t *testing.T) {
}
for i := int64(1); i < maxRev; i++ {
am := ti.Compact(i)

keep := ti.Keep(i)
if !(reflect.DeepEqual(am, keep)) {
t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep)
}
wti := &treeIndex{tree: btree.New(32)}
for _, tt := range tests {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) {
Expand All @@ -242,7 +245,7 @@ func TestIndexCompact(t *testing.T) {
}
}

// Once Compact
// Once Compact and Keep
for i := int64(1); i < maxRev; i++ {
ti := newTreeIndex()
for _, tt := range tests {
Expand All @@ -253,7 +256,10 @@ func TestIndexCompact(t *testing.T) {
}
}
am := ti.Compact(i)

keep := ti.Keep(i)
if !(reflect.DeepEqual(am, keep)) {
t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep)
}
wti := &treeIndex{tree: btree.New(32)}
for _, tt := range tests {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) {
Expand Down
61 changes: 43 additions & 18 deletions mvcc/key_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,42 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
}

genIdx, revIndex := ki.doCompact(atRev, available)

g := &ki.generations[genIdx]
if !g.isEmpty() {
// remove the previous contents.
if revIndex != -1 {
g.revs = g.revs[revIndex:]
}
// remove any tombstone
if len(g.revs) == 1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[0])
genIdx++
}
}

// remove the previous generations.
ki.generations = ki.generations[genIdx:]
}

// keep finds the revision to be kept if compact is called at given atRev.
func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
if ki.isEmpty() {
return
}

genIdx, revIndex := ki.doCompact(atRev, available)
g := &ki.generations[genIdx]
if !g.isEmpty() {
// remove any tombstone
if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[revIndex])
}
}
}

func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
// walk until reaching the first revision that has an revision smaller or equal to
// the atRev.
// add it to the available map
Expand All @@ -198,30 +234,19 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
return true
}

i, g := 0, &ki.generations[0]
genIdx, g := 0, &ki.generations[0]
// find first generation includes atRev or created after atRev
for i < len(ki.generations)-1 {
for genIdx < len(ki.generations)-1 {
if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
break
}
i++
g = &ki.generations[i]
genIdx++
g = &ki.generations[genIdx]
}

if !g.isEmpty() {
n := g.walk(f)
// remove the previous contents.
if n != -1 {
g.revs = g.revs[n:]
}
// remove any tombstone
if len(g.revs) == 1 && i != len(ki.generations)-1 {
delete(available, g.revs[0])
i++
}
}
// remove the previous generations.
ki.generations = ki.generations[i:]
revIndex = g.walk(f)

return genIdx, revIndex
}

func (ki *keyIndex) isEmpty() bool {
Expand Down
52 changes: 48 additions & 4 deletions mvcc/key_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestKeyIndexTombstone(t *testing.T) {
}
}

func TestKeyIndexCompact(t *testing.T) {
func TestKeyIndexCompactAndKeep(t *testing.T) {
tests := []struct {
compact int64

Expand Down Expand Up @@ -441,10 +441,19 @@ func TestKeyIndexCompact(t *testing.T) {
},
}

// Continuous Compaction
// Continuous Compaction and finding Keep
ki := newTestKeyIndex()
for i, tt := range tests {
am := make(map[revision]struct{})
kiclone := cloneKeyIndex(ki)
ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiclone) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone)
}
if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
ki.compact(tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
Expand All @@ -454,11 +463,20 @@ func TestKeyIndexCompact(t *testing.T) {
}
}

// Jump Compaction
// Jump Compaction and finding Keep
ki = newTestKeyIndex()
for i, tt := range tests {
if (i%2 == 0 && i < 6) || (i%2 == 1 && i > 6) {
am := make(map[revision]struct{})
kiclone := cloneKeyIndex(ki)
ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiclone) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone)
}
if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
ki.compact(tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
Expand All @@ -469,10 +487,19 @@ func TestKeyIndexCompact(t *testing.T) {
}
}

// Once Compaction
kiClone := newTestKeyIndex()
// Once Compaction and finding Keep
for i, tt := range tests {
ki := newTestKeyIndex()
am := make(map[revision]struct{})
ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiClone) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone)
}
if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
ki.compact(tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
Expand All @@ -483,6 +510,23 @@ func TestKeyIndexCompact(t *testing.T) {
}
}

func cloneKeyIndex(ki *keyIndex) *keyIndex {
generations := make([]generation, len(ki.generations))
for i, gen := range ki.generations {
generations[i] = *cloneGeneration(&gen)
}
return &keyIndex{ki.key, ki.modified, generations}
}

func cloneGeneration(g *generation) *generation {
if g.revs == nil {
return &generation{g.ver, g.created, nil}
}
tmp := make([]revision, len(g.revs))
copy(tmp, g.revs)
return &generation{g.ver, g.created, tmp}
}

// test that compact on version that higher than last modified version works well
func TestKeyIndexCompactOnFurtherRev(t *testing.T) {
ki := &keyIndex{key: []byte("foo")}
Expand Down
32 changes: 5 additions & 27 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ var (
ErrClosed = errors.New("mvcc: closed")

plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")

emptyKeep = make(map[revision]struct{})
)

const (
Expand Down Expand Up @@ -101,12 +99,6 @@ type store struct {
fifoSched schedule.Scheduler

stopc chan struct{}

// keepMu protects keep
keepMu sync.RWMutex
// keep contains all revisions <= compactMainRev to be kept for the
// ongoing compaction; nil otherwise.
keep map[revision]struct{}
}

// NewStore returns a new store. It is useful to create a store inside
Expand Down Expand Up @@ -170,33 +162,25 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
}

func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
s.mu.Lock()
s.mu.RLock()
s.revMu.RLock()
compactRev, currentRev = s.compactMainRev, s.currentRev
s.revMu.RUnlock()

if rev > 0 && rev <= compactRev {
s.mu.Unlock()
s.mu.RUnlock()
return 0, 0, compactRev, ErrCompacted
} else if rev > 0 && rev > currentRev {
s.mu.Unlock()
s.mu.RUnlock()
return 0, currentRev, 0, ErrFutureRev
}

s.keepMu.Lock()
if s.keep == nil {
// ForceCommit ensures that txnRead begins after backend
// has committed all the changes from the prev completed compaction.
s.b.ForceCommit()
s.keep = emptyKeep
}
keep := s.keep
s.keepMu.Unlock()
keep := s.kvindex.Keep(rev)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only need to acquire s.mu.RLock instead of s.mu.Lock now?


tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()
s.mu.Unlock()
s.mu.RUnlock()

if rev == 0 {
rev = currentRev
Expand Down Expand Up @@ -257,9 +241,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.b.ForceCommit()

keep := s.kvindex.Compact(rev)
s.keepMu.Lock()
s.keep = keep
s.keepMu.Unlock()
ch := make(chan struct{})
var j = func(ctx context.Context) {
if ctx.Err() != nil {
Expand All @@ -271,9 +252,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
return
}
close(ch)
s.keepMu.Lock()
s.keep = nil
s.keepMu.Unlock()
}

s.fifoSched.Schedule(j)
Expand Down
6 changes: 5 additions & 1 deletion mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
s := NewStore(b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)

rev := 1000
rev := 10000
for i := 2; i <= rev; i++ {
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
}
Expand Down Expand Up @@ -767,6 +767,10 @@ func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
return <-i.indexCompactRespc
}
func (i *fakeIndex) Keep(rev int64) map[revision]struct{} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in the same commit as the test update

i.Recorder.Record(testutil.Action{Name: "keep", Params: []interface{}{rev}})
return <-i.indexCompactRespc
}
func (i *fakeIndex) Equal(b index) bool { return false }

func (i *fakeIndex) Insert(ki *keyIndex) {
Expand Down