diff --git a/server/filestore.go b/server/filestore.go index 4de4d0dcda..4abc0c3c7f 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1854,6 +1854,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error mb.dmap[seq] = struct{}{} // Check if <50% utilization and minimum size met. if mb.rbytes > compactMinimum && mb.rbytes>>1 > mb.bytes { + // FIXME(dlc) - Might want this out of band. mb.compact() } } @@ -1938,6 +1939,9 @@ func (mb *msgBlock) compact() { return false } + // For skip msgs. + var smh [msgHdrSize]byte + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { if index+msgHdrSize >= lbuf { return @@ -1960,9 +1964,20 @@ func (mb *msgBlock) compact() { firstSet = true mb.first.seq = seq } + } else if firstSet { + // This is an interior delete that we need to make sure we have a placeholder for. + le.PutUint32(smh[0:], emptyRecordLen) + le.PutUint64(smh[4:], seq|ebit) + le.PutUint64(smh[12:], 0) + le.PutUint16(smh[20:], 0) + nbuf = append(nbuf, smh[:]...) + mb.hh.Reset() + mb.hh.Write(smh[4:20]) + checksum := mb.hh.Sum(nil) + nbuf = append(nbuf, checksum...) } // Always set last. - mb.last.seq = seq + mb.last.seq = seq &^ ebit // Advance to next record. index += rl } @@ -1986,7 +2001,9 @@ func (mb *msgBlock) compact() { if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil { return } - os.Rename(mfn, mb.mfn) + if err := os.Rename(mfn, mb.mfn); err != nil { + return + } // Close cache and open FDs and index file. mb.clearCacheAndOffset() @@ -2747,6 +2764,7 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock { return mb } } + return nil } @@ -3168,8 +3186,10 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) { return nil, err } if seq != mseq { + mb.cache.buf = nil return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, mseq) } + sm := &fileStoredMsg{ subj: subj, hdr: hdr, @@ -3179,7 +3199,6 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) { mb: mb, off: int64(bi), } - return sm, nil } diff --git a/server/filestore_test.go b/server/filestore_test.go index 2ea1812c99..5895ab16bf 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -3266,9 +3266,6 @@ func TestFileStoreSparseCompaction(t *testing.T) { if ta >= tb { t.Fatalf("Expected total after to be less then before, got %d vs %d", tb, ta) } - if ta != ua { - t.Fatalf("Expected compact to make total and used same, got %d vs %d", ta, ua) - } } // Actual testing here. @@ -3314,3 +3311,47 @@ func TestFileStoreSparseCompaction(t *testing.T) { eraseMsgs(500, 502, 504, 506, 508, 510) compact() } + +func TestFileStoreSparseCompactionWithInteriorDeletes(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + + cfg := StreamConfig{Name: "KV", Subjects: []string{"kv.>"}, Storage: FileStorage} + var fs *fileStore + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for i := 1; i <= 1000; i++ { + if _, _, err := fs.StoreMsg(fmt.Sprintf("kv.%d", i%10), nil, []byte("OK")); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + // Now do interior deletes. + for _, seq := range []uint64{500, 600, 700, 800} { + removed, err := fs.RemoveMsg(seq) + if err != nil || !removed { + t.Fatalf("Got an error on remove of %d: %v", seq, err) + } + } + + _, _, _, _, err = fs.LoadMsg(900) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Do compact by hand, make sure we can still access msgs past the interior deletes. + fs.mu.RLock() + lmb := fs.lmb + lmb.dirtyCloseWithRemove(false) + lmb.compact() + fs.mu.RUnlock() + + _, _, _, _, err = fs.LoadMsg(900) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } +}