Skip to content

Commit

Permalink
mvcc: allow large concurrent reads under light write workload
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Feb 7, 2018
1 parent b83244b commit 8e8538b
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
6 changes: 1 addition & 5 deletions internal/mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,7 @@ func (b *backend) defrag() error {
b.mu.Lock()
defer b.mu.Unlock()

// block concurrent read requests while resetting tx
b.readTx.mu.Lock()
defer b.readTx.mu.Unlock()

b.batchTx.unsafeCommit(true)
b.batchTx.commit(true)
b.batchTx.tx = nil

tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
Expand Down
17 changes: 11 additions & 6 deletions internal/mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte
isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
limit = 1
}

for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
vs = append(vs, cv)
keys = append(keys, ck)
Expand Down Expand Up @@ -154,7 +155,7 @@ func (t *batchTx) Unlock() {
t.Mutex.Unlock()
}

func (t *batchTx) commit(stop bool) {
func (t *batchTx) commit(stop bool) bool {
// commit the last tx
if t.tx != nil {
if t.pending == 0 && !stop {
Expand All @@ -169,7 +170,7 @@ func (t *batchTx) commit(stop bool) {
db := t.tx.DB()
atomic.StoreInt64(&t.backend.size, size)
atomic.StoreInt64(&t.backend.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
return
return false
}

start := time.Now()
Expand All @@ -187,6 +188,7 @@ func (t *batchTx) commit(stop bool) {
if !stop {
t.tx = t.backend.begin(true)
}
return true
}

type batchTxBuffered struct {
Expand Down Expand Up @@ -231,22 +233,25 @@ func (t *batchTxBuffered) CommitAndStop() {
}

func (t *batchTxBuffered) commit(stop bool) {
flushed := t.batchTx.commit(stop)
if !flushed {
return
}

// all read txs must be closed to acquire boltdb commit rwlock
t.backend.readTx.mu.Lock()
defer t.backend.readTx.mu.Unlock()
t.unsafeCommit(stop)
t.unsafeReadTxCommit(stop)
}

func (t *batchTxBuffered) unsafeCommit(stop bool) {
func (t *batchTxBuffered) unsafeReadTxCommit(stop bool) {
if t.backend.readTx.tx != nil {
if err := t.backend.readTx.tx.Rollback(); err != nil {
plog.Fatalf("cannot rollback tx (%s)", err)
}
t.backend.readTx.reset()
}

t.batchTx.commit(stop)

if !stop {
t.backend.readTx.tx = t.backend.begin(false)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,8 @@ func TestTxnBlockBackendForceCommit(t *testing.T) {
s := NewStore(b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)

// Put a key into the store so that force commit can take effect.
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
txn := s.Read()

done := make(chan struct{})
Expand Down

0 comments on commit 8e8538b

Please sign in to comment.