From 7b8fb3cf0ad524be6c08e2e2b4d978e228a9f5b8 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Fri, 28 Jul 2017 15:07:11 -0700 Subject: [PATCH 1/4] mvcc: add and implement Keep api to index Keep finds all revisions to be kept for a Compaction at the given rev. --- mvcc/index.go | 14 +++++++++++ mvcc/key_index.go | 61 +++++++++++++++++++++++++++++++++-------------- 2 files changed, 57 insertions(+), 18 deletions(-) diff --git a/mvcc/index.go b/mvcc/index.go index 991289cdd5c..8b8904516ed 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -28,6 +28,7 @@ type index interface { Tombstone(key []byte, rev revision) error RangeSince(key, end []byte, rev int64) []revision Compact(rev int64) map[revision]struct{} + Keep(rev int64) map[revision]struct{} Equal(b index) bool Insert(ki *keyIndex) @@ -179,6 +180,19 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { return available } +// Keep finds all revisions to be kept for a Compaction at the given rev. +func (ti *treeIndex) Keep(rev int64) map[revision]struct{} { + available := make(map[revision]struct{}) + ti.RLock() + defer ti.RUnlock() + ti.tree.Ascend(func(i btree.Item) bool { + keyi := i.(*keyIndex) + keyi.keep(rev, available) + return true + }) + return available +} + func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool { return func(i btree.Item) bool { keyi := i.(*keyIndex) diff --git a/mvcc/key_index.go b/mvcc/key_index.go index 9104f9b2d36..fe812008cc7 100644 --- a/mvcc/key_index.go +++ b/mvcc/key_index.go @@ -187,6 +187,42 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) { plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key)) } + genIdx, revIndex := ki.doCompact(atRev, available) + + g := ki.generations[genIdx] + if !g.isEmpty() { + // remove the previous contents. + if revIndex != -1 { + g.revs = g.revs[revIndex:] + } + // remove any tombstone + if len(g.revs) == 1 && genIdx != len(ki.generations)-1 { + delete(available, g.revs[0]) + genIdx++ + } + } + + // remove the previous generations. + ki.generations = ki.generations[genIdx:] +} + +// keep finds the revision to be kept if compact is called at given atRev. +func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) { + if ki.isEmpty() { + return + } + + genIdx, revIndex := ki.doCompact(atRev, available) + g := ki.generations[genIdx] + if !g.isEmpty() { + // remove any tombstone + if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 { + delete(available, g.revs[revIndex]) + } + } +} + +func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) { // walk until reaching the first revision that has an revision smaller or equal to // the atRev. // add it to the available map @@ -198,30 +234,19 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) { return true } - i, g := 0, &ki.generations[0] + genIdx, g := 0, &ki.generations[0] // find first generation includes atRev or created after atRev - for i < len(ki.generations)-1 { + for genIdx < len(ki.generations)-1 { if tomb := g.revs[len(g.revs)-1].main; tomb > atRev { break } - i++ - g = &ki.generations[i] + genIdx++ + g = &ki.generations[genIdx] } - if !g.isEmpty() { - n := g.walk(f) - // remove the previous contents. - if n != -1 { - g.revs = g.revs[n:] - } - // remove any tombstone - if len(g.revs) == 1 && i != len(ki.generations)-1 { - delete(available, g.revs[0]) - i++ - } - } - // remove the previous generations. - ki.generations = ki.generations[i:] + revIndex = g.walk(f) + + return genIdx, revIndex } func (ki *keyIndex) isEmpty() bool { From 4c2c5b00844b54a16d721f3a7c27f9a5db1d7cd6 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Fri, 28 Jul 2017 15:09:15 -0700 Subject: [PATCH 2/4] mvcc: add tests for Keep --- mvcc/index_test.go | 16 +++++++++---- mvcc/key_index.go | 4 ++-- mvcc/key_index_test.go | 52 ++++++++++++++++++++++++++++++++++++++---- mvcc/kvstore_test.go | 4 ++++ 4 files changed, 65 insertions(+), 11 deletions(-) diff --git a/mvcc/index_test.go b/mvcc/index_test.go index ef8df88fb3e..d05315601be 100644 --- a/mvcc/index_test.go +++ b/mvcc/index_test.go @@ -193,7 +193,7 @@ func TestIndexRangeSince(t *testing.T) { } } -func TestIndexCompact(t *testing.T) { +func TestIndexCompactAndKeep(t *testing.T) { maxRev := int64(20) tests := []struct { key []byte @@ -215,7 +215,7 @@ func TestIndexCompact(t *testing.T) { {[]byte("foo1"), false, revision{10, 1}, revision{10, 1}, 1}, } - // Continuous Compact + // Continuous Compact and Keep ti := newTreeIndex() for _, tt := range tests { if tt.remove { @@ -226,7 +226,10 @@ func TestIndexCompact(t *testing.T) { } for i := int64(1); i < maxRev; i++ { am := ti.Compact(i) - + keep := ti.Keep(i) + if !(reflect.DeepEqual(am, keep)) { + t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep) + } wti := &treeIndex{tree: btree.New(32)} for _, tt := range tests { if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) { @@ -242,7 +245,7 @@ func TestIndexCompact(t *testing.T) { } } - // Once Compact + // Once Compact and Keep for i := int64(1); i < maxRev; i++ { ti := newTreeIndex() for _, tt := range tests { @@ -253,7 +256,10 @@ func TestIndexCompact(t *testing.T) { } } am := ti.Compact(i) - + keep := ti.Keep(i) + if !(reflect.DeepEqual(am, keep)) { + t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep) + } wti := &treeIndex{tree: btree.New(32)} for _, tt := range tests { if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) { diff --git a/mvcc/key_index.go b/mvcc/key_index.go index fe812008cc7..8181d9d21c9 100644 --- a/mvcc/key_index.go +++ b/mvcc/key_index.go @@ -189,7 +189,7 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) { genIdx, revIndex := ki.doCompact(atRev, available) - g := ki.generations[genIdx] + g := &ki.generations[genIdx] if !g.isEmpty() { // remove the previous contents. if revIndex != -1 { @@ -213,7 +213,7 @@ func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) { } genIdx, revIndex := ki.doCompact(atRev, available) - g := ki.generations[genIdx] + g := &ki.generations[genIdx] if !g.isEmpty() { // remove any tombstone if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 { diff --git a/mvcc/key_index_test.go b/mvcc/key_index_test.go index bfaed9856e7..57e6a9cd769 100644 --- a/mvcc/key_index_test.go +++ b/mvcc/key_index_test.go @@ -205,7 +205,7 @@ func TestKeyIndexTombstone(t *testing.T) { } } -func TestKeyIndexCompact(t *testing.T) { +func TestKeyIndexCompactAndKeep(t *testing.T) { tests := []struct { compact int64 @@ -441,10 +441,19 @@ func TestKeyIndexCompact(t *testing.T) { }, } - // Continuous Compaction + // Continuous Compaction and finding Keep ki := newTestKeyIndex() for i, tt := range tests { am := make(map[revision]struct{}) + kiclone := cloneKeyIndex(ki) + ki.keep(tt.compact, am) + if !reflect.DeepEqual(ki, kiclone) { + t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone) + } + if !reflect.DeepEqual(am, tt.wam) { + t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) + } + am = make(map[revision]struct{}) ki.compact(tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) @@ -454,11 +463,20 @@ func TestKeyIndexCompact(t *testing.T) { } } - // Jump Compaction + // Jump Compaction and finding Keep ki = newTestKeyIndex() for i, tt := range tests { if (i%2 == 0 && i < 6) || (i%2 == 1 && i > 6) { am := make(map[revision]struct{}) + kiclone := cloneKeyIndex(ki) + ki.keep(tt.compact, am) + if !reflect.DeepEqual(ki, kiclone) { + t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone) + } + if !reflect.DeepEqual(am, tt.wam) { + t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) + } + am = make(map[revision]struct{}) ki.compact(tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) @@ -469,10 +487,19 @@ func TestKeyIndexCompact(t *testing.T) { } } - // Once Compaction + kiClone := newTestKeyIndex() + // Once Compaction and finding Keep for i, tt := range tests { ki := newTestKeyIndex() am := make(map[revision]struct{}) + ki.keep(tt.compact, am) + if !reflect.DeepEqual(ki, kiClone) { + t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone) + } + if !reflect.DeepEqual(am, tt.wam) { + t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) + } + am = make(map[revision]struct{}) ki.compact(tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) @@ -483,6 +510,23 @@ func TestKeyIndexCompact(t *testing.T) { } } +func cloneKeyIndex(ki *keyIndex) *keyIndex { + generations := make([]generation, len(ki.generations)) + for i, gen := range ki.generations { + generations[i] = *cloneGeneration(&gen) + } + return &keyIndex{ki.key, ki.modified, generations} +} + +func cloneGeneration(g *generation) *generation { + if g.revs == nil { + return &generation{g.ver, g.created, nil} + } + tmp := make([]revision, len(g.revs)) + copy(tmp, g.revs) + return &generation{g.ver, g.created, tmp} +} + // test that compact on version that higher than last modified version works well func TestKeyIndexCompactOnFurtherRev(t *testing.T) { ki := &keyIndex{key: []byte("foo")} diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index f60ee2eb2ab..f2f8cd851d3 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -767,6 +767,10 @@ func (i *fakeIndex) Compact(rev int64) map[revision]struct{} { i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}}) return <-i.indexCompactRespc } +func (i *fakeIndex) Keep(rev int64) map[revision]struct{} { + i.Recorder.Record(testutil.Action{Name: "keep", Params: []interface{}{rev}}) + return <-i.indexCompactRespc +} func (i *fakeIndex) Equal(b index) bool { return false } func (i *fakeIndex) Insert(ki *keyIndex) { From bb86c327e2fdda5e8883f7f57f6ae7a15f5665d5 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Fri, 28 Jul 2017 16:17:00 -0700 Subject: [PATCH 3/4] mvcc: HashKV gets keep from kvindex.Keep --- mvcc/kvstore.go | 32 +++++--------------------------- 1 file changed, 5 insertions(+), 27 deletions(-) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 618ca0786de..d062f5ba744 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -45,8 +45,6 @@ var ( ErrClosed = errors.New("mvcc: closed") plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc") - - emptyKeep = make(map[revision]struct{}) ) const ( @@ -101,12 +99,6 @@ type store struct { fifoSched schedule.Scheduler stopc chan struct{} - - // keepMu protects keep - keepMu sync.RWMutex - // keep contains all revisions <= compactMainRev to be kept for the - // ongoing compaction; nil otherwise. - keep map[revision]struct{} } // NewStore returns a new store. It is useful to create a store inside @@ -170,33 +162,25 @@ func (s *store) Hash() (hash uint32, revision int64, err error) { } func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { - s.mu.Lock() + s.mu.RLock() s.revMu.RLock() compactRev, currentRev = s.compactMainRev, s.currentRev s.revMu.RUnlock() if rev > 0 && rev <= compactRev { - s.mu.Unlock() + s.mu.RUnlock() return 0, 0, compactRev, ErrCompacted } else if rev > 0 && rev > currentRev { - s.mu.Unlock() + s.mu.RUnlock() return 0, currentRev, 0, ErrFutureRev } - s.keepMu.Lock() - if s.keep == nil { - // ForceCommit ensures that txnRead begins after backend - // has committed all the changes from the prev completed compaction. - s.b.ForceCommit() - s.keep = emptyKeep - } - keep := s.keep - s.keepMu.Unlock() + keep := s.kvindex.Keep(rev) tx := s.b.ReadTx() tx.Lock() defer tx.Unlock() - s.mu.Unlock() + s.mu.RUnlock() if rev == 0 { rev = currentRev @@ -257,9 +241,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.b.ForceCommit() keep := s.kvindex.Compact(rev) - s.keepMu.Lock() - s.keep = keep - s.keepMu.Unlock() ch := make(chan struct{}) var j = func(ctx context.Context) { if ctx.Err() != nil { @@ -271,9 +252,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { return } close(ch) - s.keepMu.Lock() - s.keep = nil - s.keepMu.Unlock() } s.fifoSched.Schedule(j) From df5a3d15ce87e42cbdfb8645873cba0425143bcc Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Fri, 28 Jul 2017 16:49:09 -0700 Subject: [PATCH 4/4] mvcc: increase rev for TestHashKVWhenCompacting --- mvcc/kvstore_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index f2f8cd851d3..4135900c1b8 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -522,7 +522,7 @@ func TestHashKVWhenCompacting(t *testing.T) { s := NewStore(b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) - rev := 1000 + rev := 10000 for i := 2; i <= rev; i++ { s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease) }