Skip to content

Commit

Permalink
fix: configure discard policy for WorkQueue/Interest (#1884)
Browse files Browse the repository at this point in the history
  • Loading branch information
kohlisid authored Jul 31, 2024
1 parent e7c32c1 commit 280b9bd
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,35 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b
if !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to query information of stream %q during buffer creating, %w", streamName, err)
}
// get the retention policy from the stream config
retention := nats.RetentionPolicy(v.GetInt("stream.retention"))
discard := nats.DiscardNew

// Based on the retention policy we use the following discard policy
// 1) Limits Policy -> DiscardOld
// 2) WorkQueuePolicy/Interest -> DiscardNew

// In WorkQueuePolicy the messages will be removed as soon as the Consumer received an Acknowledgement.
// In InterestPolicy messages will be removed as soon as all Consumers of the stream for that subject have
// received an Acknowledgement for the message.
// For Numaflow, workqueue and interest is the same, because we only have one consumer
// Old messages should be deleted once, they are acknowledged, hence we use DiscardNew with these two
// policies in which during a buffer full we will not write more message to the stream and wait
// for the older messages to get cleared

// When operating with DiscardNew and Limits, on reaching the maxMsgs limit, it will result in the stream
// returning an error when attempting to write new messages and old messages will not be deleted from the stream
// so the pipeline will get stuck. Hence, we cannot use Limits with DiscardNew.
//
if retention == nats.LimitsPolicy {
discard = nats.DiscardOld
}

if _, err := jss.js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamName}, // Use the stream name as the only subject
Retention: nats.RetentionPolicy(v.GetInt("stream.retention")),
Discard: nats.DiscardOld,
Retention: retention,
Discard: discard,
MaxMsgs: v.GetInt64("stream.maxMsgs"),
MaxAge: v.GetDuration("stream.maxAge"),
MaxBytes: v.GetInt64("stream.maxBytes"),
Expand Down

0 comments on commit 280b9bd

Please sign in to comment.