Skip to content

Commit

Permalink
Merge pull request #2505 from nats-io/issue-2488-2
Browse files Browse the repository at this point in the history
[FIXED] #2488
  • Loading branch information
derekcollison authored Sep 9, 2021
2 parents 9e5526c + f753710 commit 6fa3a0e
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 6 deletions.
25 changes: 22 additions & 3 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
mb.dmap[seq] = struct{}{}
// Check if <50% utilization and minimum size met.
if mb.rbytes > compactMinimum && mb.rbytes>>1 > mb.bytes {
// FIXME(dlc) - Might want this out of band.
mb.compact()
}
}
Expand Down Expand Up @@ -1938,6 +1939,9 @@ func (mb *msgBlock) compact() {
return false
}

// For skip msgs.
var smh [msgHdrSize]byte

for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize >= lbuf {
return
Expand All @@ -1960,9 +1964,20 @@ func (mb *msgBlock) compact() {
firstSet = true
mb.first.seq = seq
}
} else if firstSet {
// This is an interior delete that we need to make sure we have a placeholder for.
le.PutUint32(smh[0:], emptyRecordLen)
le.PutUint64(smh[4:], seq|ebit)
le.PutUint64(smh[12:], 0)
le.PutUint16(smh[20:], 0)
nbuf = append(nbuf, smh[:]...)
mb.hh.Reset()
mb.hh.Write(smh[4:20])
checksum := mb.hh.Sum(nil)
nbuf = append(nbuf, checksum...)
}
// Always set last.
mb.last.seq = seq
mb.last.seq = seq &^ ebit
// Advance to next record.
index += rl
}
Expand All @@ -1986,7 +2001,9 @@ func (mb *msgBlock) compact() {
if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
return
}
os.Rename(mfn, mb.mfn)
if err := os.Rename(mfn, mb.mfn); err != nil {
return
}

// Close cache and open FDs and index file.
mb.clearCacheAndOffset()
Expand Down Expand Up @@ -2747,6 +2764,7 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock {
return mb
}
}

return nil
}

Expand Down Expand Up @@ -3168,8 +3186,10 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
return nil, err
}
if seq != mseq {
mb.cache.buf = nil
return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, mseq)
}

sm := &fileStoredMsg{
subj: subj,
hdr: hdr,
Expand All @@ -3179,7 +3199,6 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
mb: mb,
off: int64(bi),
}

return sm, nil
}

Expand Down
47 changes: 44 additions & 3 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3266,9 +3266,6 @@ func TestFileStoreSparseCompaction(t *testing.T) {
if ta >= tb {
t.Fatalf("Expected total after to be less then before, got %d vs %d", tb, ta)
}
if ta != ua {
t.Fatalf("Expected compact to make total and used same, got %d vs %d", ta, ua)
}
}

// Actual testing here.
Expand Down Expand Up @@ -3314,3 +3311,47 @@ func TestFileStoreSparseCompaction(t *testing.T) {
eraseMsgs(500, 502, 504, 506, 508, 510)
compact()
}

func TestFileStoreSparseCompactionWithInteriorDeletes(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)

cfg := StreamConfig{Name: "KV", Subjects: []string{"kv.>"}, Storage: FileStorage}
var fs *fileStore

fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for i := 1; i <= 1000; i++ {
if _, _, err := fs.StoreMsg(fmt.Sprintf("kv.%d", i%10), nil, []byte("OK")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

// Now do interior deletes.
for _, seq := range []uint64{500, 600, 700, 800} {
removed, err := fs.RemoveMsg(seq)
if err != nil || !removed {
t.Fatalf("Got an error on remove of %d: %v", seq, err)
}
}

_, _, _, _, err = fs.LoadMsg(900)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Do compact by hand, make sure we can still access msgs past the interior deletes.
fs.mu.RLock()
lmb := fs.lmb
lmb.dirtyCloseWithRemove(false)
lmb.compact()
fs.mu.RUnlock()

_, _, _, _, err = fs.LoadMsg(900)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

0 comments on commit 6fa3a0e

Please sign in to comment.