Skip to content

Commit

Permalink
Merge pull request #2482 from nats-io/js_expire_pull_reqs
Browse files Browse the repository at this point in the history
[FIXED] Pull requests: don't send 408 when request expires
  • Loading branch information
kozlovic authored Sep 2, 2021
2 parents 219a7c9 + ba36aa4 commit cd258e7
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (

const (
// VERSION is the current version for the server.
VERSION = "2.4.0"
VERSION = "2.4.1-beta.1"

// PROTO is the currently supported protocol.
// 0 was the original
Expand Down
4 changes: 2 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2089,7 +2089,7 @@ func (o *consumer) expireWaiting() int {
now := time.Now()
for wr := o.waiting.peek(); wr != nil; wr = o.waiting.peek() {
if !wr.expires.IsZero() && now.After(wr.expires) {
o.forceExpireFirstWaiting()
o.waiting.pop()
expired++
continue
}
Expand All @@ -2103,7 +2103,7 @@ func (o *consumer) expireWaiting() int {
break
}
// No more interest so go ahead and remove this one from our list.
o.forceExpireFirstWaiting()
o.waiting.pop()
expired++
}
return expired
Expand Down
18 changes: 10 additions & 8 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1843,23 +1843,25 @@ func TestJetStreamWorkQueueRequest(t *testing.T) {
// Now do expiration
req.Batch = 1
req.NoWait = false
req.Expires = 10 * time.Millisecond
req.Expires = 100 * time.Millisecond
jreq, _ = json.Marshal(req)

nc.PublishRequest(getSubj, reply, jreq)
// Let it expire
time.Sleep(20 * time.Millisecond)
time.Sleep(200 * time.Millisecond)

// Send a few more messages. These should not be delivered to the sub.
sendStreamMsg(t, nc, "foo", "Hello World!")
sendStreamMsg(t, nc, "bar", "Hello World!")
// We will have an alert here.
time.Sleep(100 * time.Millisecond)
checkSubPending(0)

// Send a new request, we should not get the 408 because our previous request
// should have expired.
nc.PublishRequest(getSubj, reply, jreq)
checkSubPending(1)
m, _ := sub.NextMsg(0)
// Make sure this is an alert that tells us our request is now stale.
if m.Header.Get("Status") != "408" {
t.Fatalf("Expected a 408 status code, got %q", m.Header.Get("Status"))
}
sub.NextMsg(time.Second)
checkSubPending(0)
})
}
}
Expand Down

0 comments on commit cd258e7

Please sign in to comment.