Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Improvements to dealing with old or non-existant index.db #5893

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 145 additions & 32 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Attempt to recover our state.
err = fs.recoverFullState()
if err != nil {
if !os.IsNotExist(err) {
fs.warn("Recovering stream state from index errored: %v", err)
}
// Hold onto state
prior := fs.state
// Reset anything that could have been set from above.
Expand Down Expand Up @@ -469,7 +472,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
go fs.cleanupOldMeta()
}()

// Lock while do enforcements and removals.
// Lock while we do enforcements and removals.
fs.mu.Lock()

// Check if we have any left over tombstones to process.
Expand Down Expand Up @@ -975,7 +978,6 @@ func (mb *msgBlock) ensureLastChecksumLoaded() {
// Lock held on entry
func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) {
mb := fs.initMsgBlock(index)

// Open up the message file, but we will try to recover from the index file.
// We will check that the last checksums match.
file, err := mb.openBlock()
Expand Down Expand Up @@ -1357,6 +1359,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
minTombstoneTs int64
)

// To detect gaps from compaction.
var last uint64

for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize > lbuf {
truncate(index)
Expand Down Expand Up @@ -1444,8 +1449,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
mb.bytes += uint64(rl)
}

// Check for any gaps from compaction, meaning no ebit entry.
if last > 0 && seq != last+1 {
for dseq := last + 1; dseq < seq; dseq++ {
addToDmap(dseq)
}
}

// Always set last
atomic.StoreUint64(&mb.last.seq, seq)
last = seq
atomic.StoreUint64(&mb.last.seq, last)
mb.last.ts = ts

// Advance to next record.
Expand Down Expand Up @@ -1665,7 +1678,8 @@ func (fs *fileStore) recoverFullState() (rerr error) {
for i := 0; i < int(numBlocks); i++ {
index, nbytes, fseq, fts, lseq, lts, numDeleted := uint32(readU64()), readU64(), readU64(), readI64(), readU64(), readI64(), readU64()
if bi < 0 {
break
os.Remove(fn)
return errCorruptState
}
mb := fs.initMsgBlock(index)
atomic.StoreUint64(&mb.first.seq, fseq)
Expand Down Expand Up @@ -1734,6 +1748,12 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// If we are tracking max msgs per subject and we are not up to date we should rebuild.
if fs.cfg.MaxMsgsPer > 0 {
fs.warn("Stream state block state outdated, will rebuild")
return errPriorState
}

// Remove the last message block since recover will add in the new one.
fs.removeMsgBlockFromList(mb)
// Reverse update of tracking state for this mb, will add new state in below.
Expand Down Expand Up @@ -1776,11 +1796,19 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// Update top level accounting
if fseq := atomic.LoadUint64(&nmb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq {
fs.state.FirstSeq = fseq
fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC()
if nmb.first.ts == 0 {
fs.state.FirstTime = time.Time{}
} else {
fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC()
}
}
if lseq := atomic.LoadUint64(&nmb.last.seq); lseq > fs.state.LastSeq {
fs.state.LastSeq = lseq
fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC()
if mb.last.ts == 0 {
fs.state.LastTime = time.Time{}
} else {
fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC()
}
}
fs.state.Msgs += nmb.msgs
fs.state.Bytes += nmb.bytes
Expand Down Expand Up @@ -5415,7 +5443,8 @@ func (mb *msgBlock) ensureRawBytesLoaded() error {
// Sync msg and index files as needed. This is called from a timer.
func (fs *fileStore) syncBlocks() {
fs.mu.RLock()
if fs.closed {
// If closed or a snapshot is in progress bail.
if fs.closed || fs.sips > 0 {
fs.mu.RUnlock()
return
}
Expand Down Expand Up @@ -6899,6 +6928,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}

var smv StoreMsg
var tombs []msgId

fs.mu.Lock()
// We may remove blocks as we purge, so don't range directly on fs.blks
Expand Down Expand Up @@ -6952,9 +6982,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
purged++
bytes += rl
}
// FSS updates.
// PSIM and FSS updates.
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
// Track tombstones we need to write.
tombs = append(tombs, msgId{sm.seq, sm.ts})

// Check for first message.
if seq == atomic.LoadUint64(&mb.first.seq) {
Expand Down Expand Up @@ -6993,7 +7025,16 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
if firstSeqNeedsUpdate {
fs.selectNextFirst()
}
fseq := fs.state.FirstSeq

// Write any tombstones as needed.
for _, tomb := range tombs {
if tomb.seq > fseq {
fs.lmb.writeTombstone(tomb.seq, tomb.ts)
}
}

os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++
cb := fs.scb
fs.mu.Unlock()
Expand Down Expand Up @@ -7036,7 +7077,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
fs.bim = make(map[uint32]*msgBlock)
// Clear any per subject tracking.
fs.psim, fs.tsl = fs.psim.Empty(), 0
// Mark dirty
// Mark dirty.
fs.dirty++

// Move the msgs directory out of the way, will delete out of band.
Expand Down Expand Up @@ -7092,6 +7133,11 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
cb := fs.scb
fs.mu.Unlock()

// Force a new index.db to be written.
if purged > 0 {
fs.forceWriteFullState()
}

if cb != nil {
cb(-int64(purged), -rbytes, 0, _EMPTY_)
}
Expand Down Expand Up @@ -7286,11 +7332,19 @@ SKIP:
}
fs.state.Bytes -= bytes

// Any existing state file no longer applicable. We will force write a new one
// after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++

cb := fs.scb
fs.mu.Unlock()

// Force a new index.db to be written.
if purged > 0 {
fs.forceWriteFullState()
}

if cb != nil && purged > 0 {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
Expand Down Expand Up @@ -7351,6 +7405,40 @@ func (fs *fileStore) reset() error {
return nil
}

// Return all active tombstones in this msgBlock.
// Write lock should be held.
func (mb *msgBlock) tombs() []msgId {
var tombs []msgId

if !mb.cacheAlreadyLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return nil
}
}

var le = binary.LittleEndian
buf := mb.cache.buf

for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize > lbuf {
return tombs
}
hdr := buf[index : index+msgHdrSize]
rl, seq := le.Uint32(hdr[0:]), le.Uint64(hdr[4:])
// Clear any headers bit that could be set.
rl &^= hbit
// Check for tombstones.
if seq&tbit != 0 {
ts := int64(le.Uint64(hdr[12:]))
tombs = append(tombs, msgId{seq &^ tbit, ts})
}
// Advance to next record.
index += rl
}

return tombs
}

// Truncate will truncate a stream store up to seq. Sequence needs to be valid.
func (fs *fileStore) Truncate(seq uint64) error {
// Check for request to reset.
Expand Down Expand Up @@ -7386,6 +7474,10 @@ func (fs *fileStore) Truncate(seq uint64) error {
fs.mu.Unlock()
return err
}
// Collect all tombstones, we want to put these back so we can survive
// a restore without index.db properly.
var tombs []msgId
tombs = append(tombs, nlmb.tombs()...)

var purged, bytes uint64

Expand All @@ -7403,6 +7495,8 @@ func (fs *fileStore) Truncate(seq uint64) error {
getLastMsgBlock := func() *msgBlock { return fs.blks[len(fs.blks)-1] }
for mb := getLastMsgBlock(); mb != nlmb; mb = getLastMsgBlock() {
mb.mu.Lock()
// We do this to load tombs.
tombs = append(tombs, mb.tombs()...)
purged += mb.msgs
bytes += mb.bytes
fs.removeMsgBlock(mb)
Expand All @@ -7425,11 +7519,29 @@ func (fs *fileStore) Truncate(seq uint64) error {
// Reset our subject lookup info.
fs.resetGlobalPerSubjectInfo()

// Always create new write block.
fs.newMsgBlockForWrite()

// Write any tombstones as needed.
for _, tomb := range tombs {
if tomb.seq <= lsm.seq {
fs.lmb.writeTombstone(tomb.seq, tomb.ts)
}
}

// Any existing state file no longer applicable. We will force write a new one
// after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++

cb := fs.scb
fs.mu.Unlock()

// Force a new index.db to be written.
if purged > 0 {
fs.forceWriteFullState()
}

if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
Expand Down Expand Up @@ -8251,26 +8363,6 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) {
msgPre := msgDir + "/"
var bbuf []byte

const minLen = 32
sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen {
if fs.aek != nil {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil)
if err == nil {
// Redo hash checksum at end on plaintext.
fs.mu.Lock()
hh.Reset()
hh.Write(buf)
buf = fs.hh.Sum(buf)
fs.mu.Unlock()
}
}
if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil {
return
}
}

// Now do messages themselves.
for _, mb := range blks {
if mb.pendingWriteSize() > 0 {
Expand Down Expand Up @@ -8309,6 +8401,30 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) {
}
}

// Do index.db last. We will force a write as well.
// Write out full state as well before proceeding.
if err := fs.forceWriteFullState(); err == nil {
const minLen = 32
sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen {
if fs.aek != nil {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil)
if err == nil {
// Redo hash checksum at end on plaintext.
fs.mu.Lock()
hh.Reset()
hh.Write(buf)
buf = fs.hh.Sum(buf)
fs.mu.Unlock()
}
}
if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil {
return
}
}
}

// Bail if no consumers requested.
if !includeConsumers {
return
Expand Down Expand Up @@ -8381,9 +8497,6 @@ func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumer
}
}

// Write out full state as well before proceeding.
fs.writeFullState()

pr, pw := net.Pipe()

// Set a write deadline here to protect ourselves.
Expand Down
Loading
Loading