Skip to content

Commit

Permalink
*: let iter support lowerBound
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Mo <[email protected]>
  • Loading branch information
Defined2014 committed Jul 12, 2023
1 parent 5e3e38b commit bbc1514
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 31 deletions.
7 changes: 4 additions & 3 deletions internal/unionstore/memdb_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ func (db *MemDB) Iter(k []byte, upperBound []byte) (Iterator, error) {
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// The returned iterator will iterate from greater key to smaller key.
// If k is nil, the returned iterator will be positioned at the last key.
// TODO: Add lower bound limit
func (db *MemDB) IterReverse(k []byte) (Iterator, error) {
// It yields only keys that >= lowerBound. If lowerBound is nil, it means the lowerBound is unbounded.
func (db *MemDB) IterReverse(k []byte, lowerBound []byte) (Iterator, error) {
i := &MemdbIterator{
db: db,
start: lowerBound,
end: k,
reverse: true,
}
Expand Down Expand Up @@ -128,7 +129,7 @@ func (i *MemdbIterator) Valid() bool {
if !i.reverse {
return !i.curr.isNull() && (i.end == nil || bytes.Compare(i.Key(), i.end) < 0)
}
return !i.curr.isNull()
return !i.curr.isNull() && (i.start == nil || bytes.Compare(i.Key(), i.start) >= 0)
}

// Flags returns flags belong to current iterator.
Expand Down
29 changes: 26 additions & 3 deletions internal/unionstore/memdb_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ func (db *MemDB) SnapshotIter(start, end []byte) Iterator {
return it
}

// SnapshotIterReverse returns a reverse Iterator for a snapshot of MemBuffer.
func (db *MemDB) SnapshotIterReverse(k, lowerBound []byte) Iterator {
it := &memdbSnapIter{
MemdbIterator: &MemdbIterator{
db: db,
start: lowerBound,
end: k,
reverse: true,
},
cp: db.getSnapshot(),
}
it.init()
return it
}

func (db *MemDB) getSnapshot() MemDBCheckpoint {
if len(db.stages) > 0 {
return db.stages[0]
Expand Down Expand Up @@ -123,10 +138,18 @@ func (i *memdbSnapIter) setValue() bool {
}

func (i *memdbSnapIter) init() {
if len(i.start) == 0 {
i.seekToFirst()
if i.reverse {
if len(i.end) == 0 {
i.seekToLast()
} else {
i.seek(i.end)
}
} else {
i.seek(i.start)
if len(i.start) == 0 {
i.seekToFirst()
} else {
i.seek(i.start)
}
}

if !i.setValue() {
Expand Down
38 changes: 29 additions & 9 deletions internal/unionstore/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,34 @@ func TestIterator(t *testing.T) {
}
assert.Equal(i, cnt)

i--
for it, _ := db.IterReverse(nil); it.Valid(); it.Next() {
for it, _ := db.IterReverse(nil, nil); it.Valid(); it.Next() {
binary.BigEndian.PutUint32(buf[:], uint32(i-1))
assert.Equal(it.Key(), buf[:])
assert.Equal(it.Value(), buf[:])
i--
}
assert.Equal(i, 0)

var upperBoundBytes, lowerBoundBytes [4]byte
bound := 400
binary.BigEndian.PutUint32(upperBoundBytes[:], uint32(bound))
for it, _ := db.Iter(nil, upperBoundBytes[:]); it.Valid(); it.Next() {
binary.BigEndian.PutUint32(buf[:], uint32(i))
assert.Equal(it.Key(), buf[:])
assert.Equal(it.Value(), buf[:])
i++
}
assert.Equal(i, bound)

i = cnt
binary.BigEndian.PutUint32(lowerBoundBytes[:], uint32(bound))
for it, _ := db.IterReverse(nil, lowerBoundBytes[:]); it.Valid(); it.Next() {
binary.BigEndian.PutUint32(buf[:], uint32(i-1))
assert.Equal(it.Key(), buf[:])
assert.Equal(it.Value(), buf[:])
i--
}
assert.Equal(i, -1)
assert.Equal(i, bound)
}

func TestDiscard(t *testing.T) {
Expand Down Expand Up @@ -139,7 +159,7 @@ func TestDiscard(t *testing.T) {
assert.Equal(i, cnt)

i--
for it, _ := db.IterReverse(nil); it.Valid(); it.Next() {
for it, _ := db.IterReverse(nil, nil); it.Valid(); it.Next() {
binary.BigEndian.PutUint32(buf[:], uint32(i))
assert.Equal(it.Key(), buf[:])
assert.Equal(it.Value(), buf[:])
Expand Down Expand Up @@ -197,7 +217,7 @@ func TestFlushOverwrite(t *testing.T) {
assert.Equal(i, cnt)

i--
for it, _ := db.IterReverse(nil); it.Valid(); it.Next() {
for it, _ := db.IterReverse(nil, nil); it.Valid(); it.Next() {
binary.BigEndian.PutUint32(kbuf[:], uint32(i))
binary.BigEndian.PutUint32(vbuf[:], uint32(i+1))
assert.Equal(it.Key(), kbuf[:])
Expand Down Expand Up @@ -279,7 +299,7 @@ func TestNestedSandbox(t *testing.T) {
assert.Equal(i, 200)

i--
for it, _ := db.IterReverse(nil); it.Valid(); it.Next() {
for it, _ := db.IterReverse(nil, nil); it.Valid(); it.Next() {
binary.BigEndian.PutUint32(kbuf[:], uint32(i))
binary.BigEndian.PutUint32(vbuf[:], uint32(i))
if i < 100 {
Expand Down Expand Up @@ -336,7 +356,7 @@ func TestOverwrite(t *testing.T) {
assert.Equal(i, cnt)

i--
for it, _ := db.IterReverse(nil); it.Valid(); it.Next() {
for it, _ := db.IterReverse(nil, nil); it.Valid(); it.Next() {
binary.BigEndian.PutUint32(buf[:], uint32(i))
assert.Equal(it.Key(), buf[:])
v := binary.BigEndian.Uint32(it.Value())
Expand Down Expand Up @@ -569,7 +589,7 @@ func checkConsist(t *testing.T, p1 *MemDB, p2 *leveldb.DB) {
assert.Equal(it.Value(), it2.Value())

if prevKey != nil {
it, _ = p1.IterReverse(it2.Key())
it, _ = p1.IterReverse(it2.Key(), nil)
assert.Equal(it.Key(), prevKey)
assert.Equal(it.Value(), prevVal)
}
Expand All @@ -579,7 +599,7 @@ func checkConsist(t *testing.T, p1 *MemDB, p2 *leveldb.DB) {
prevVal = it2.Value()
}

it1, _ = p1.IterReverse(nil)
it1, _ = p1.IterReverse(nil, nil)
for it2.Last(); it2.Valid(); it2.Prev() {
assert.Equal(it1.Key(), it2.Key())
assert.Equal(it1.Value(), it2.Value())
Expand Down
4 changes: 2 additions & 2 deletions internal/unionstore/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (s *mockSnapshot) Iter(k []byte, upperBound []byte) (Iterator, error) {
return s.store.Iter(k, upperBound)
}

func (s *mockSnapshot) IterReverse(k []byte) (Iterator, error) {
return s.store.IterReverse(k)
func (s *mockSnapshot) IterReverse(k, lowerBound []byte) (Iterator, error) {
return s.store.IterReverse(k, lowerBound)
}

func (s *mockSnapshot) SetOption(opt int, val interface{}) {}
9 changes: 4 additions & 5 deletions internal/unionstore/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ type uSnapshot interface {
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// The returned iterator will iterate from greater key to smaller key.
// If k is nil, the returned iterator will be positioned at the last key.
// TODO: Add lower bound limit
IterReverse(k []byte) (Iterator, error)
IterReverse(k, lowerBound []byte) (Iterator, error)
}

// KVUnionStore is an in-memory Store which contains a buffer for write and a
Expand Down Expand Up @@ -124,12 +123,12 @@ func (us *KVUnionStore) Iter(k, upperBound []byte) (Iterator, error) {
}

// IterReverse implements the Retriever interface.
func (us *KVUnionStore) IterReverse(k []byte) (Iterator, error) {
bufferIt, err := us.memBuffer.IterReverse(k)
func (us *KVUnionStore) IterReverse(k, lowerBound []byte) (Iterator, error) {
bufferIt, err := us.memBuffer.IterReverse(k, lowerBound)
if err != nil {
return nil, err
}
retrieverIt, err := us.snapshot.IterReverse(k)
retrieverIt, err := us.snapshot.IterReverse(k, lowerBound)
if err != nil {
return nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions internal/unionstore/union_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,27 @@ func TestUnionStoreIterReverse(t *testing.T) {
err = store.Set([]byte("3"), []byte("3"))
assert.Nil(err)

iter, err := us.IterReverse(nil)
iter, err := us.IterReverse(nil, nil)
assert.Nil(err)
checkIterator(t, iter, [][]byte{[]byte("3"), []byte("2"), []byte("1")}, [][]byte{[]byte("3"), []byte("2"), []byte("1")})

iter, err = us.IterReverse([]byte("3"))
iter, err = us.IterReverse([]byte("3"), []byte("1"))
assert.Nil(err)
checkIterator(t, iter, [][]byte{[]byte("2"), []byte("1")}, [][]byte{[]byte("2"), []byte("1")})

iter, err = us.IterReverse([]byte("3"), nil)
assert.Nil(err)
checkIterator(t, iter, [][]byte{[]byte("2"), []byte("1")}, [][]byte{[]byte("2"), []byte("1")})

err = us.GetMemBuffer().Set([]byte("0"), []byte("0"))
assert.Nil(err)
iter, err = us.IterReverse([]byte("3"))
iter, err = us.IterReverse([]byte("3"), nil)
assert.Nil(err)
checkIterator(t, iter, [][]byte{[]byte("2"), []byte("1"), []byte("0")}, [][]byte{[]byte("2"), []byte("1"), []byte("0")})

err = us.GetMemBuffer().Delete([]byte("1"))
assert.Nil(err)
iter, err = us.IterReverse([]byte("3"))
iter, err = us.IterReverse([]byte("3"), nil)
assert.Nil(err)
checkIterator(t, iter, [][]byte{[]byte("2"), []byte("0")}, [][]byte{[]byte("2"), []byte("0")})
}
Expand Down
4 changes: 2 additions & 2 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ func (txn *KVTxn) Iter(k []byte, upperBound []byte) (unionstore.Iterator, error)
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
func (txn *KVTxn) IterReverse(k []byte) (unionstore.Iterator, error) {
return txn.us.IterReverse(k)
func (txn *KVTxn) IterReverse(k, lowerBound []byte) (unionstore.Iterator, error) {
return txn.us.IterReverse(k, lowerBound)
}

// Delete removes the entry for key k from kv store.
Expand Down
2 changes: 1 addition & 1 deletion txnkv/txnsnapshot/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *Scanner) Next() error {
}

current := s.cache[s.idx]
if (!s.reverse && (len(s.endKey) > 0 && kv.CmpKey(current.Key, s.endKey) >= 0)) ||
if (!s.reverse && len(s.endKey) > 0 && kv.CmpKey(current.Key, s.endKey) >= 0) ||
(s.reverse && len(s.nextStartKey) > 0 && kv.CmpKey(current.Key, s.nextStartKey) < 0) {
s.eof = true
s.Close()
Expand Down
4 changes: 2 additions & 2 deletions txnkv/txnsnapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,8 @@ func (s *KVSnapshot) Iter(k []byte, upperBound []byte) (unionstore.Iterator, err
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
func (s *KVSnapshot) IterReverse(k []byte) (unionstore.Iterator, error) {
scanner, err := newScanner(s, nil, k, s.scanBatchSize, true)
func (s *KVSnapshot) IterReverse(k, lowerBound []byte) (unionstore.Iterator, error) {
scanner, err := newScanner(s, lowerBound, k, s.scanBatchSize, true)
return scanner, err
}

Expand Down

0 comments on commit bbc1514

Please sign in to comment.