Skip to content

Commit

Permalink
Merge pull request #2490 from nats-io/fs-stable
Browse files Browse the repository at this point in the history
Fixed a bug that could lead to perceived message loss under JetStream.
  • Loading branch information
derekcollison authored Sep 7, 2021
2 parents 5396fbe + 2b2c4ba commit 963b0c4
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 139 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ before_script:
script:
- set -e
- if [[ $TRAVIS_TAG ]]; then go test -v -run=TestVersionMatchesTag ./server; fi
- if [[ ! $TRAVIS_TAG ]]; then go test -v -run=TestNoRace --failfast -p=1 ./...; fi
- if [[ ! $TRAVIS_TAG ]]; then if [[ "$TRAVIS_GO_VERSION" =~ 1.15 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast ./...; fi; fi
- if [[ ! $TRAVIS_TAG ]]; then go test -v -run=TestNoRace --failfast -p=1 -timeout 20m ./...; fi
- if [[ ! $TRAVIS_TAG ]]; then if [[ "$TRAVIS_GO_VERSION" =~ 1.15 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast -timeout 20m ./...; fi; fi
- set +e

deploy:
Expand Down
13 changes: 10 additions & 3 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2058,10 +2058,10 @@ func (o *consumer) getNextMsg() (subj string, hdr, msg []byte, seq uint64, dc ui
}
// We got an error here. If this is an EOF we will return, otherwise
// we can continue looking.
if err == ErrStoreEOF || err == ErrStoreClosed {
if err == ErrStoreEOF || err == ErrStoreClosed || err == errNoCache || err == errPartialCache {
return _EMPTY_, nil, nil, 0, 0, 0, err
}
// Skip since its probably deleted or expired.
// Skip since its deleted or expired.
o.sseq++
}
}
Expand Down Expand Up @@ -2254,7 +2254,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}

// We will wait here for new messages to arrive.
mch, outq, odsubj, sseq, dseq := o.mch, o.outq, o.cfg.DeliverSubject, o.sseq-1, o.dseq-1
mch, outq, odsubj := o.mch, o.outq, o.cfg.DeliverSubject
o.mu.Unlock()

select {
Expand All @@ -2269,6 +2269,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
case <-hbc:
if o.isActive() {
const t = "NATS/1.0 100 Idle Heartbeat\r\n%s: %d\r\n%s: %d\r\n\r\n"
sseq, dseq := o.lastDelivered()
hdr := []byte(fmt.Sprintf(t, JSLastConsumerSeq, dseq, JSLastStreamSeq, sseq))
if fcp := o.fcID(); fcp != _EMPTY_ {
// Add in that we are stalled on flow control here.
Expand All @@ -2283,6 +2284,12 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}
}

func (o *consumer) lastDelivered() (sseq, dseq uint64) {
o.mu.RLock()
defer o.mu.RUnlock()
return o.sseq - 1, o.dseq - 1
}

func (o *consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) string {
return fmt.Sprintf(o.ackReplyT, dc, sseq, dseq, ts, pending)
}
Expand Down
Loading

0 comments on commit 963b0c4

Please sign in to comment.