From e5d2e77864fcf8c1a18fa372a36a380cfa2a36c9 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 16 Sep 2024 14:41:48 -0700 Subject: [PATCH] Improvements to dealing with old or non-existant index.db files during snapshot restore or restart. We had a condition where an old index.db was not able to properly restore a stream due to max msgs per subject being set and certain blocks being compacted away and removing subject info for those sequences. In addition we fixed recovery after Truncate and PurgeEx by subject when the index.db was corrupt or not available. This change also moves generating the index.db file to after writing the blocks during a snapshot and we do a force call to make sure it is written even when complex. Signed-off-by: Derek Collison --- server/filestore.go | 177 ++++++++++++++++++++++++++++++++------- server/filestore_test.go | 96 +++++++++++++++++++-- server/norace_test.go | 65 ++++++++++++++ 3 files changed, 300 insertions(+), 38 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 606c6143ae5..a0a77b5f713 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -436,6 +436,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Attempt to recover our state. err = fs.recoverFullState() if err != nil { + if !os.IsNotExist(err) { + fs.warn("Recovering stream state from index errored: %v", err) + } // Hold onto state prior := fs.state // Reset anything that could have been set from above. @@ -469,7 +472,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim go fs.cleanupOldMeta() }() - // Lock while do enforcements and removals. + // Lock while we do enforcements and removals. fs.mu.Lock() // Check if we have any left over tombstones to process. @@ -975,7 +978,6 @@ func (mb *msgBlock) ensureLastChecksumLoaded() { // Lock held on entry func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) { mb := fs.initMsgBlock(index) - // Open up the message file, but we will try to recover from the index file. // We will check that the last checksums match. file, err := mb.openBlock() @@ -1357,6 +1359,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { minTombstoneTs int64 ) + // To detect gaps from compaction. + var last uint64 + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { if index+msgHdrSize > lbuf { truncate(index) @@ -1444,8 +1449,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { mb.bytes += uint64(rl) } + // Check for any gaps from compaction, meaning no ebit entry. + if last > 0 && seq != last+1 { + for dseq := last + 1; dseq < seq; dseq++ { + addToDmap(dseq) + } + } + // Always set last - atomic.StoreUint64(&mb.last.seq, seq) + last = seq + atomic.StoreUint64(&mb.last.seq, last) mb.last.ts = ts // Advance to next record. @@ -1665,7 +1678,8 @@ func (fs *fileStore) recoverFullState() (rerr error) { for i := 0; i < int(numBlocks); i++ { index, nbytes, fseq, fts, lseq, lts, numDeleted := uint32(readU64()), readU64(), readU64(), readI64(), readU64(), readI64(), readU64() if bi < 0 { - break + os.Remove(fn) + return errCorruptState } mb := fs.initMsgBlock(index) atomic.StoreUint64(&mb.first.seq, fseq) @@ -1734,6 +1748,12 @@ func (fs *fileStore) recoverFullState() (rerr error) { return errPriorState } if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched { + // If we are tracking max msgs per subject and we are not up to date we should rebuild. + if fs.cfg.MaxMsgsPer > 0 { + fs.warn("Stream state block state outdated, will rebuild") + return errPriorState + } + // Remove the last message block since recover will add in the new one. fs.removeMsgBlockFromList(mb) // Reverse update of tracking state for this mb, will add new state in below. @@ -1776,11 +1796,19 @@ func (fs *fileStore) recoverFullState() (rerr error) { // Update top level accounting if fseq := atomic.LoadUint64(&nmb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq { fs.state.FirstSeq = fseq - fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC() + if nmb.first.ts == 0 { + fs.state.FirstTime = time.Time{} + } else { + fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC() + } } if lseq := atomic.LoadUint64(&nmb.last.seq); lseq > fs.state.LastSeq { fs.state.LastSeq = lseq - fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC() + if mb.last.ts == 0 { + fs.state.LastTime = time.Time{} + } else { + fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC() + } } fs.state.Msgs += nmb.msgs fs.state.Bytes += nmb.bytes @@ -5415,7 +5443,8 @@ func (mb *msgBlock) ensureRawBytesLoaded() error { // Sync msg and index files as needed. This is called from a timer. func (fs *fileStore) syncBlocks() { fs.mu.RLock() - if fs.closed { + // If closed or a snapshot is in progress bail. + if fs.closed || fs.sips > 0 { fs.mu.RUnlock() return } @@ -6899,6 +6928,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint } var smv StoreMsg + var tombs []msgId fs.mu.Lock() // We may remove blocks as we purge, so don't range directly on fs.blks @@ -6952,9 +6982,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint purged++ bytes += rl } - // FSS updates. + // PSIM and FSS updates. mb.removeSeqPerSubject(sm.subj, seq) fs.removePerSubject(sm.subj) + // Track tombstones we need to write. + tombs = append(tombs, msgId{sm.seq, sm.ts}) // Check for first message. if seq == atomic.LoadUint64(&mb.first.seq) { @@ -6993,7 +7025,16 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint if firstSeqNeedsUpdate { fs.selectNextFirst() } + fseq := fs.state.FirstSeq + + // Write any tombstones as needed. + for _, tomb := range tombs { + if tomb.seq > fseq { + fs.lmb.writeTombstone(tomb.seq, tomb.ts) + } + } + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb fs.mu.Unlock() @@ -7036,7 +7077,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { fs.bim = make(map[uint32]*msgBlock) // Clear any per subject tracking. fs.psim, fs.tsl = fs.psim.Empty(), 0 - // Mark dirty + // Mark dirty. fs.dirty++ // Move the msgs directory out of the way, will delete out of band. @@ -7092,6 +7133,11 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { cb := fs.scb fs.mu.Unlock() + // Force a new index.db to be written. + if purged > 0 { + fs.forceWriteFullState() + } + if cb != nil { cb(-int64(purged), -rbytes, 0, _EMPTY_) } @@ -7286,11 +7332,19 @@ SKIP: } fs.state.Bytes -= bytes + // Any existing state file no longer applicable. We will force write a new one + // after we release the lock. + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb fs.mu.Unlock() + // Force a new index.db to be written. + if purged > 0 { + fs.forceWriteFullState() + } + if cb != nil && purged > 0 { cb(-int64(purged), -int64(bytes), 0, _EMPTY_) } @@ -7351,6 +7405,40 @@ func (fs *fileStore) reset() error { return nil } +// Return all active tombstones in this msgBlock. +// Write lock should be held. +func (mb *msgBlock) tombs() []msgId { + var tombs []msgId + + if !mb.cacheAlreadyLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + return nil + } + } + + var le = binary.LittleEndian + buf := mb.cache.buf + + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { + if index+msgHdrSize > lbuf { + return tombs + } + hdr := buf[index : index+msgHdrSize] + rl, seq := le.Uint32(hdr[0:]), le.Uint64(hdr[4:]) + // Clear any headers bit that could be set. + rl &^= hbit + // Check for tombstones. + if seq&tbit != 0 { + ts := int64(le.Uint64(hdr[12:])) + tombs = append(tombs, msgId{seq &^ tbit, ts}) + } + // Advance to next record. + index += rl + } + + return tombs +} + // Truncate will truncate a stream store up to seq. Sequence needs to be valid. func (fs *fileStore) Truncate(seq uint64) error { // Check for request to reset. @@ -7386,6 +7474,10 @@ func (fs *fileStore) Truncate(seq uint64) error { fs.mu.Unlock() return err } + // Collect all tombstones, we want to put these back so we can survive + // a restore without index.db properly. + var tombs []msgId + tombs = append(tombs, nlmb.tombs()...) var purged, bytes uint64 @@ -7403,6 +7495,8 @@ func (fs *fileStore) Truncate(seq uint64) error { getLastMsgBlock := func() *msgBlock { return fs.blks[len(fs.blks)-1] } for mb := getLastMsgBlock(); mb != nlmb; mb = getLastMsgBlock() { mb.mu.Lock() + // We do this to load tombs. + tombs = append(tombs, mb.tombs()...) purged += mb.msgs bytes += mb.bytes fs.removeMsgBlock(mb) @@ -7425,11 +7519,29 @@ func (fs *fileStore) Truncate(seq uint64) error { // Reset our subject lookup info. fs.resetGlobalPerSubjectInfo() + // Always create new write block. + fs.newMsgBlockForWrite() + + // Write any tombstones as needed. + for _, tomb := range tombs { + if tomb.seq <= lsm.seq { + fs.lmb.writeTombstone(tomb.seq, tomb.ts) + } + } + + // Any existing state file no longer applicable. We will force write a new one + // after we release the lock. + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb fs.mu.Unlock() + // Force a new index.db to be written. + if purged > 0 { + fs.forceWriteFullState() + } + if cb != nil { cb(-int64(purged), -int64(bytes), 0, _EMPTY_) } @@ -8251,26 +8363,6 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) { msgPre := msgDir + "/" var bbuf []byte - const minLen = 32 - sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile) - if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen { - if fs.aek != nil { - ns := fs.aek.NonceSize() - buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil) - if err == nil { - // Redo hash checksum at end on plaintext. - fs.mu.Lock() - hh.Reset() - hh.Write(buf) - buf = fs.hh.Sum(buf) - fs.mu.Unlock() - } - } - if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil { - return - } - } - // Now do messages themselves. for _, mb := range blks { if mb.pendingWriteSize() > 0 { @@ -8309,6 +8401,30 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) { } } + // Do index.db last. We will force a write as well. + // Write out full state as well before proceeding. + if err := fs.forceWriteFullState(); err == nil { + const minLen = 32 + sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile) + if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen { + if fs.aek != nil { + ns := fs.aek.NonceSize() + buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil) + if err == nil { + // Redo hash checksum at end on plaintext. + fs.mu.Lock() + hh.Reset() + hh.Write(buf) + buf = fs.hh.Sum(buf) + fs.mu.Unlock() + } + } + if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil { + return + } + } + } + // Bail if no consumers requested. if !includeConsumers { return @@ -8381,9 +8497,6 @@ func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumer } } - // Write out full state as well before proceeding. - fs.writeFullState() - pr, pw := net.Pipe() // Set a write deadline here to protect ourselves. diff --git a/server/filestore_test.go b/server/filestore_test.go index 243155902f6..0868fcec02a 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1051,6 +1051,19 @@ func TestFileStoreStreamTruncate(t *testing.T) { mb := fs.getFirstBlock() require_True(t, mb != nil) require_NoError(t, mb.loadMsgs()) + + // Also make sure we can recover properly with no index.db present. + // We want to make sure we preserve tombstones from any blocks being deleted. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state of %+v, got %+v without index.db state", before, state) + } }) } @@ -3635,11 +3648,16 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { fcfg.BlockSize = 1000 cfg := StreamConfig{Name: "TEST", Subjects: []string{"foo.>"}, Storage: FileStorage} - fs, err := newFileStoreWithCreated(fcfg, cfg, time.Now(), prf(&fcfg), nil) + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) require_NoError(t, err) defer fs.Stop() payload := make([]byte, 20) + + _, _, err = fs.StoreMsg("foo.0", nil, payload) + require_NoError(t, err) + total := 200 for i := 0; i < total; i++ { _, _, err = fs.StoreMsg("foo.1", nil, payload) @@ -3648,13 +3666,38 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { _, _, err = fs.StoreMsg("foo.2", nil, []byte("xxxxxx")) require_NoError(t, err) - // This should purge all. + // This should purge all "foo.1" p, err := fs.PurgeEx("foo.1", 1, 0) require_NoError(t, err) - require_True(t, int(p) == total) - require_True(t, int(p) == total) - require_True(t, fs.State().Msgs == 1) - require_True(t, fs.State().FirstSeq == 201) + require_Equal(t, p, uint64(total)) + + state := fs.State() + require_Equal(t, state.Msgs, 2) + require_Equal(t, state.FirstSeq, 1) + + // Make sure we can recover same state. + fs.Stop() + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + before := state + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state of %+v, got %+v", before, state) + } + + // Also make sure we can recover properly with no index.db present. + // We want to make sure we preserve any tombstones from the subject based purge. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state of %+v, got %+v without index.db state", before, state) + } }) } @@ -7557,6 +7600,47 @@ func TestFileStoreSyncCompressOnlyIfDirty(t *testing.T) { require_False(t, noCompact) } +// This test is for deleted interior message tracking after compaction from limits based deletes, meaning no tombstones. +// Bug was that dmap would not be properly be hydrated after the compact from rebuild. But we did so in populateGlobalInfo. +// So this is just to fix a bug in rebuildState tracking gaps after a compact. +func TestFileStoreDmapBlockRecoverAfterCompact(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 256}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1}) + require_NoError(t, err) + defer fs.Stop() + + msg := []byte("hello") + + // 6 msgs per block. + // Fill the first block. + for i := 1; i <= 6; i++ { + fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 1) + + // Now create holes in the first block via the max msgs per subject of 1. + for i := 2; i < 6; i++ { + fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 2) + // Compact and rebuild the first blk. Do not have it call indexCacheBuf which will fix it up. + mb := fs.getFirstBlock() + mb.mu.Lock() + mb.compact() + // Empty out dmap state. + mb.dmap.Empty() + ld, tombs, err := mb.rebuildStateLocked() + dmap := mb.dmap.Clone() + mb.mu.Unlock() + + require_NoError(t, err) + require_Equal(t, ld, nil) + require_Equal(t, len(tombs), 0) + require_Equal(t, dmap.Size(), 4) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/norace_test.go b/server/norace_test.go index 7de9c949eec..74e76dacdc6 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -10838,3 +10838,68 @@ func TestNoRaceJetStreamStandaloneDontReplyToAckBeforeProcessingIt(t *testing.T) } } } + +// Under certain scenarios an old index.db with a stream that has max msgs per set will not restore properly +// due to and old index.db and compaction after the index.db took place which could lose per subject information. +func TestNoRaceFileStoreMaxMsgsPerSubjectAndOldRecoverState(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1}) + require_NoError(t, err) + defer fs.Stop() + + msg := make([]byte, 1024) + + for i := 0; i < 10_000; i++ { + subj := fmt.Sprintf("foo.%d", i) + fs.StoreMsg(subj, nil, msg) + } + + // This will write the index.db file. We will capture this and use it to replace a new one. + sfile := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile) + fs.Stop() + _, err = os.Stat(sfile) + require_NoError(t, err) + + // Read it in and make sure len > 0. + buf, err := os.ReadFile(sfile) + require_NoError(t, err) + require_True(t, len(buf) > 0) + + // Restart + fs, err = newFileStore( + FileStoreConfig{StoreDir: sd}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1}) + require_NoError(t, err) + defer fs.Stop() + + // Put in more messages with wider range. This will compact a bunch of the previous blocks. + for i := 0; i < 1_000_001; i++ { + subj := fmt.Sprintf("foo.%d", i) + fs.StoreMsg(subj, nil, msg) + } + + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, 10_001) + require_Equal(t, ss.LastSeq, 1_010_001) + require_Equal(t, ss.Msgs, 1_000_001) + + // Now stop again, but replace index.db with old one. + fs.Stop() + // Put back old stream state. + require_NoError(t, os.WriteFile(sfile, buf, defaultFilePerms)) + + // Restart + fs, err = newFileStore( + FileStoreConfig{StoreDir: sd}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1}) + require_NoError(t, err) + defer fs.Stop() + + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, 10_001) + require_Equal(t, ss.LastSeq, 1_010_001) + require_Equal(t, ss.Msgs, 1_000_001) +}