diff --git a/server/consumer.go b/server/consumer.go index 3b7fe4dd63b..1f96e772e8c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4687,44 +4687,50 @@ func (o *consumer) selectStartingSeqNo() { } } } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { - // A threshold for when we switch from get last msg to subjects state. - const numSubjectsThresh = 256 - lss := &lastSeqSkipList{resume: state.LastSeq} - var filters []string - if o.subjf == nil { - filters = append(filters, o.cfg.FilterSubject) + // If our parent stream is set to max msgs per subject of 1 this is just + // a normal consumer at this point. We can avoid any heavy lifting. + if o.mset.cfg.MaxMsgsPer == 1 { + o.sseq = state.FirstSeq } else { - for _, filter := range o.subjf { - filters = append(filters, filter.subject) + // A threshold for when we switch from get last msg to subjects state. + const numSubjectsThresh = 256 + lss := &lastSeqSkipList{resume: state.LastSeq} + var filters []string + if o.subjf == nil { + filters = append(filters, o.cfg.FilterSubject) + } else { + for _, filter := range o.subjf { + filters = append(filters, filter.subject) + } } - } - for _, filter := range filters { - if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh { - var smv StoreMsg - for subj := range st { - if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil { - lss.seqs = append(lss.seqs, sm.seq) + for _, filter := range filters { + if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh { + var smv StoreMsg + for subj := range st { + if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil { + lss.seqs = append(lss.seqs, sm.seq) + } + } + } else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 { + for _, ss := range mss { + lss.seqs = append(lss.seqs, ss.Last) } - } - } else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 { - for _, ss := range mss { - lss.seqs = append(lss.seqs, ss.Last) } } + // Sort the skip list if needed. + if len(lss.seqs) > 1 { + sort.Slice(lss.seqs, func(i, j int) bool { + return lss.seqs[j] > lss.seqs[i] + }) + } + if len(lss.seqs) == 0 { + o.sseq = state.LastSeq + } else { + o.sseq = lss.seqs[0] + } + // Assign skip list. + o.lss = lss } - // Sort the skip list if needed. - if len(lss.seqs) > 1 { - sort.Slice(lss.seqs, func(i, j int) bool { - return lss.seqs[j] > lss.seqs[i] - }) - } - if len(lss.seqs) == 0 { - o.sseq = state.LastSeq - } else { - o.sseq = lss.seqs[0] - } - // Assign skip list. - o.lss = lss } else if o.cfg.OptStartTime != nil { // If we are here we are time based. // TODO(dlc) - Once clustered can't rely on this. diff --git a/server/filestore.go b/server/filestore.go index 4fcbea4d759..3157e6784f4 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2426,7 +2426,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { fs.mu.RLock() defer fs.mu.RUnlock() - if fs.state.Msgs == 0 { + if fs.state.Msgs == 0 || fs.noTrackSubjects() { return nil } @@ -2454,7 +2454,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { mb.mu.Lock() var shouldExpire bool - if mb.cacheNotLoaded() { + if mb.fss == nil { // Make sure we have fss loaded. mb.loadMsgsWithLock() shouldExpire = true