diff --git a/server/filestore.go b/server/filestore.go index bc16f6fb7fd..d76dc70c114 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1749,56 +1749,32 @@ func (fs *fileStore) recoverFullState() (rerr error) { } if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched { // Detected a stale index.db, we didn't write it upon shutdown so can't rely on it being correct. - fs.warn("Stream state outdated, will rebuild") + fs.warn("Stream state outdated, last block has additional entries, will rebuild") return errPriorState } - // On success double check our state. - checkState := func() error { - // We check first and last seq and number of msgs and bytes. If there is a difference, - // return and error so we rebuild from the message block state on disk. - if !trackingStatesEqual(&fs.state, &mstate) { - fs.warn("Stream state encountered internal inconsistency on recover") - os.Remove(fn) - return errCorruptState - } - return nil + // We need to see if any blocks exist after our last one even though we matched the last record exactly. + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) + var dirs []os.DirEntry + + <-dios + if f, err := os.Open(mdir); err == nil { + dirs, _ = f.ReadDir(-1) + f.Close() } + dios <- struct{}{} - // We may need to check other blocks. Even if we matched last checksum we will see if there is another block. - for bi := blkIndex + 1; ; bi++ { - nmb, err := fs.recoverMsgBlock(bi) - if err != nil { - if os.IsNotExist(err) { - return checkState() - } - os.Remove(fn) - fs.warn("Stream state could not recover msg block %d", bi) - return err - } - if nmb != nil { - // Update top level accounting - if fseq := atomic.LoadUint64(&nmb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq { - fs.state.FirstSeq = fseq - if nmb.first.ts == 0 { - fs.state.FirstTime = time.Time{} - } else { - fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC() - } - } - if lseq := atomic.LoadUint64(&nmb.last.seq); lseq > fs.state.LastSeq { - fs.state.LastSeq = lseq - if mb.last.ts == 0 { - fs.state.LastTime = time.Time{} - } else { - fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC() - } + var index uint32 + for _, fi := range dirs { + if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 { + if index > blkIndex { + fs.warn("Stream state outdated, found extra blocks, will rebuild") + return errPriorState } - fs.state.Msgs += nmb.msgs - fs.state.Bytes += nmb.bytes - updateTrackingState(&mstate, nmb) } } + + return nil } // Grabs last checksum for the named block file. diff --git a/server/filestore_test.go b/server/filestore_test.go index 0868fcec02a..cb3f65ade9e 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -7641,6 +7641,72 @@ func TestFileStoreDmapBlockRecoverAfterCompact(t *testing.T) { require_Equal(t, dmap.Size(), 4) } +func TestFileStoreRestoreIndexWithMatchButLeftOverBlocks(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 256}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1}) + require_NoError(t, err) + defer fs.Stop() + + msg := []byte("hello") + + // 6 msgs per block. + // Fill the first 2 blocks. + for i := 1; i <= 12; i++ { + fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 2) + + // We will now stop which will create the index.db file which will + // match the last record exactly. + sfile := filepath.Join(sd, msgDir, streamStreamStateFile) + fs.Stop() + + // Grab it since we will put it back. + buf, err := os.ReadFile(sfile) + require_NoError(t, err) + require_True(t, len(buf) > 0) + + // Now do an additional block, but with the MaxMsgsPer this will remove the first block, + // but leave the second so on recovery will match the checksum for the last msg in second block. + + fs, err = newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 256}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1}) + require_NoError(t, err) + defer fs.Stop() + + for i := 1; i <= 6; i++ { + fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg) + } + + // Grab correct state, we will use it to make sure we do the right thing. + var state StreamState + fs.FastState(&state) + + require_Equal(t, state.Msgs, 12) + require_Equal(t, state.FirstSeq, 7) + require_Equal(t, state.LastSeq, 18) + // This will be block 2 and 3. + require_Equal(t, fs.numMsgBlocks(), 2) + + fs.Stop() + // Put old stream state back. + require_NoError(t, os.WriteFile(sfile, buf, defaultFilePerms)) + + fs, err = newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 256}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1}) + require_NoError(t, err) + defer fs.Stop() + + fs.FastState(&state) + require_Equal(t, state.Msgs, 12) + require_Equal(t, state.FirstSeq, 7) + require_Equal(t, state.LastSeq, 18) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks ///////////////////////////////////////////////////////////////////////////