From 642e23a7bcc5454ea685ffc97ecf223a4074eea5 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 23 Apr 2019 21:29:04 +0530 Subject: [PATCH] Set readTs in write batch to latest done readTs so that we can discard older versions of keys during compactions. Every Transaction stores the latest value of `readTs` it is aware of. When the transaction is discarded (which happens even when we commit), the global value of `readMark` is updated. https://github.com/dgraph-io/badger/blob/1fcc96ecdb66d221df85cddec186b6ac7b6dab4b/txn.go#L501-L503 Previously, the `readTs` of transaction inside the write batch struct was set to 0. So the global value of `readMark` would also be set to 0 (unless someone ran a transaction after using write batch). Due to the 0 value of the global `readMark`, the compaction algorithm would skip all the values which were inserted in the write batch call. See https://github.com/dgraph-io/badger/blob/1fcc96ecdb66d221df85cddec186b6ac7b6dab4b/levels.go#L480-L484 and https://github.com/dgraph-io/badger/blob/1fcc96ecdb66d221df85cddec186b6ac7b6dab4b/txn.go#L138-L145 The `o.readMark.DoneUntil()` call would always return `0` and so the compaction wouldn't compact the newer values. With this commit, the compaction algorithm works as expected with key-values inserted via Transaction API or via the Write Batch API. See https://github.com/dgraph-io/badger/issues/767 --- batch.go | 12 +++++++++-- batch_test.go | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/batch.go b/batch.go index 2c26d4b07..2187f9c6d 100644 --- a/batch.go +++ b/batch.go @@ -36,7 +36,14 @@ type WriteBatch struct { // creating and committing transactions. Due to the nature of SSI guaratees provided by Badger, // blind writes can never encounter transaction conflicts (ErrConflict). func (db *DB) NewWriteBatch() *WriteBatch { - return &WriteBatch{db: db, txn: db.newTransaction(true, true)} + txn := db.newTransaction(true, true) + // If we let it stay at zero, compactions would not allow older key versions to be deleted, + // because the read timestamps of pending txns, would be zero. Therefore, we set it to the + // maximum read timestamp that's done. This allows compactions to discard key versions below + // this read timestamp, while also not blocking on pending txns to finish before starting this + // one. + txn.readTs = db.orc.readMark.DoneUntil() + return &WriteBatch{db: db, txn: txn} } // Cancel function must be called if there's a chance that Flush might not get @@ -128,7 +135,8 @@ func (wb *WriteBatch) commit() error { wb.wg.Add(1) wb.txn.CommitWith(wb.callback) wb.txn = wb.db.newTransaction(true, true) - wb.txn.readTs = 0 // We're not reading anything. + // See comment about readTs in NewWriteBatch. + wb.txn.readTs = wb.db.orc.readMark.DoneUntil() return wb.err } diff --git a/batch_test.go b/batch_test.go index 041fe92a9..85c8fcb9b 100644 --- a/batch_test.go +++ b/batch_test.go @@ -18,6 +18,8 @@ package badger import ( "fmt" + "io/ioutil" + "os" "testing" "time" @@ -67,3 +69,59 @@ func TestWriteBatch(t *testing.T) { require.NoError(t, err) }) } + +func TestWriteBatchCompaction(t *testing.T) { + dir, err := ioutil.TempDir(".", "badger-test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + opts := DefaultOptions + opts.ValueDir = dir + opts.Dir = dir + + db, err := Open(opts) + require.NoError(t, err) + + wb := db.NewWriteBatch() + entries := 10000 + for i := 0; i < entries; i++ { + require.Nil(t, wb.Set([]byte(fmt.Sprintf("foo%d", i)), []byte("bar"), 0)) + } + require.Nil(t, wb.Flush()) + + wb = db.NewWriteBatch() + // Delete 50% of the entries + for i := 0; i < entries/2; i++ { + require.Nil(t, wb.Delete([]byte(fmt.Sprintf("foo%d", i)))) + } + require.Nil(t, wb.Flush()) + + // It is necessary that we call db.Update(..) before compaction so that the db.orc.readMark + // value is incremented. The transactions in WriteBatch call do not increment the + // db.orc.readMark value and hence compaction wouldn't discard any entries added by write batch + // if we do not increment the db.orc.readMark value + require.Nil(t, db.Update(func(txn *Txn) error { + txn.Set([]byte("key1"), []byte("val1")) + return nil + })) + + // Close DB to force compaction + require.Nil(t, db.Close()) + + db, err = Open(opts) + require.NoError(t, err) + defer db.Close() + + iopt := DefaultIteratorOptions + iopt.AllVersions = true + txn := db.NewTransaction(false) + defer txn.Discard() + it := txn.NewIterator(iopt) + defer it.Close() + countAfterCompaction := 0 + for it.Rewind(); it.Valid(); it.Next() { + countAfterCompaction++ + } + // We have deleted 50% of the keys + require.Less(t, countAfterCompaction, entries+entries/2) +}