From d1b6ec8064ea04e5aa14e62b81fc9fa1eb81cf61 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Fri, 15 Mar 2024 04:14:14 -0400 Subject: [PATCH] [ADDED] Pause and resume jetstream consumer (#1571) Signed-off-by: Yordis Prieto --- jetstream/api.go | 3 + jetstream/consumer.go | 38 ++++++++++ jetstream/consumer_config.go | 10 +++ jetstream/jetstream.go | 20 +++++ jetstream/stream.go | 34 +++++++++ jetstream/test/consumer_test.go | 9 +++ jetstream/test/stream_test.go | 129 ++++++++++++++++++++++++++++++++ 7 files changed, 243 insertions(+) diff --git a/jetstream/api.go b/jetstream/api.go index 1cea088ed..940b1be7c 100644 --- a/jetstream/api.go +++ b/jetstream/api.go @@ -63,6 +63,9 @@ const ( // apiConsumerDeleteT is used to delete consumers. apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s" + // apiConsumerPauseT is used to pause a consumer. + apiConsumerPauseT = "CONSUMER.PAUSE.%s.%s" + // apiConsumerListT is used to return all detailed consumer information apiConsumerListT = "CONSUMER.LIST.%s" diff --git a/jetstream/consumer.go b/jetstream/consumer.go index ee48a1ec3..d7a8e7739 100644 --- a/jetstream/consumer.go +++ b/jetstream/consumer.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/nats-io/nats.go/internal/syncx" "github.com/nats-io/nuid" @@ -321,6 +322,43 @@ func deleteConsumer(ctx context.Context, js *jetStream, stream, consumer string) return nil } +func pauseConsumer(ctx context.Context, js *jetStream, stream, consumer string, pauseUntil *time.Time) (*ConsumerPauseResponse, error) { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } + if err := validateConsumerName(consumer); err != nil { + return nil, err + } + subject := apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerPauseT, stream, consumer)) + + var resp consumerPauseApiResponse + req, err := json.Marshal(consumerPauseRequest{ + PauseUntil: pauseUntil, + }) + if err != nil { + return nil, err + } + if _, err := js.apiRequestJSON(ctx, subject, &resp, req); err != nil { + return nil, err + } + if resp.Error != nil { + if resp.Error.ErrorCode == JSErrCodeConsumerNotFound { + return nil, ErrConsumerNotFound + } + return nil, resp.Error + } + return &ConsumerPauseResponse{ + Paused: resp.Paused, + PauseUntil: resp.PauseUntil, + PauseRemaining: resp.PauseRemaining, + }, nil +} + +func resumeConsumer(ctx context.Context, js *jetStream, stream, consumer string) (*ConsumerPauseResponse, error) { + return pauseConsumer(ctx, js, stream, consumer, nil) +} + func validateConsumerName(dur string) error { if dur == "" { return fmt.Errorf("%w: '%s'", ErrInvalidConsumerName, "name is required") diff --git a/jetstream/consumer_config.go b/jetstream/consumer_config.go index 4e2e3d6e0..5d419cdfa 100644 --- a/jetstream/consumer_config.go +++ b/jetstream/consumer_config.go @@ -75,6 +75,13 @@ type ( // TimeStamp indicates when the info was gathered by the server. TimeStamp time.Time `json:"ts"` + + // Paused indicates whether the consumer is paused. + Paused bool `json:"paused,omitempty"` + + // PauseRemaining contains the amount of time left until the consumer + // unpauses. It will only be non-zero if the consumer is currently paused. + PauseRemaining time.Duration `json:"pause_remaining,omitempty"` } // ConsumerConfig is the configuration of a JetStream consumer. @@ -217,6 +224,9 @@ type ( // associating metadata on the consumer. This feature requires // nats-server v2.10.0 or later. Metadata map[string]string `json:"metadata,omitempty"` + + // PauseUntil is for suspending the consumer until the deadline. + PauseUntil *time.Time `json:"pause_until,omitempty"` } // OrderedConsumerConfig is the configuration of an ordered JetStream diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index e401cb926..55f8c97e3 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -196,6 +196,12 @@ type ( // DeleteConsumer removes a consumer with given name from a stream. // If consumer does not exist, ErrConsumerNotFound is returned. DeleteConsumer(ctx context.Context, stream string, consumer string) error + + // PauseConsumer pauses a consumer until the given time. + PauseConsumer(ctx context.Context, stream string, consumer string, pauseUntil time.Time) (*ConsumerPauseResponse, error) + + // ResumeConsumer resumes a paused consumer. + ResumeConsumer(ctx context.Context, stream string, consumer string) (*ConsumerPauseResponse, error) } // StreamListOpt is a functional option for [StreamManager.ListStreams] and @@ -781,6 +787,20 @@ func (js *jetStream) DeleteConsumer(ctx context.Context, stream string, name str return deleteConsumer(ctx, js, stream, name) } +func (js *jetStream) PauseConsumer(ctx context.Context, stream string, consumer string, pauseUntil time.Time) (*ConsumerPauseResponse, error) { + if err := validateStreamName(stream); err != nil { + return nil, err + } + return pauseConsumer(ctx, js, stream, consumer, &pauseUntil) +} + +func (js *jetStream) ResumeConsumer(ctx context.Context, stream string, consumer string) (*ConsumerPauseResponse, error) { + if err := validateStreamName(stream); err != nil { + return nil, err + } + return resumeConsumer(ctx, js, stream, consumer) +} + func validateStreamName(stream string) error { if stream == "" { return ErrStreamNameRequired diff --git a/jetstream/stream.go b/jetstream/stream.go index 4741a51c4..f170b5739 100644 --- a/jetstream/stream.go +++ b/jetstream/stream.go @@ -101,6 +101,12 @@ type ( // If consumer does not exist, ErrConsumerNotFound is returned. DeleteConsumer(ctx context.Context, consumer string) error + // PauseConsumer pauses a consumer. + PauseConsumer(ctx context.Context, consumer string, pauseUntil time.Time) (*ConsumerPauseResponse, error) + + // ResumeConsumer resumes a consumer. + ResumeConsumer(ctx context.Context, consumer string) (*ConsumerPauseResponse, error) + // ListConsumers returns ConsumerInfoLister enabling iterating over a // channel of consumer infos. ListConsumers(context.Context) ConsumerInfoLister @@ -163,6 +169,24 @@ type ( Success bool `json:"success,omitempty"` } + consumerPauseRequest struct { + PauseUntil *time.Time `json:"pause_until,omitempty"` + } + + ConsumerPauseResponse struct { + // Paused is true if the consumer is paused. + Paused bool `json:"paused"` + // PauseUntil is the time until the consumer is paused. + PauseUntil time.Time `json:"pause_until"` + // PauseRemaining is the time remaining until the consumer is paused. + PauseRemaining time.Duration `json:"pause_remaining,omitempty"` + } + + consumerPauseApiResponse struct { + apiResponse + ConsumerPauseResponse + } + // GetMsgOpt is a function setting options for [Stream.GetMsg] GetMsgOpt func(*apiMsgGetRequest) error @@ -296,6 +320,16 @@ func (s *stream) DeleteConsumer(ctx context.Context, name string) error { return deleteConsumer(ctx, s.jetStream, s.name, name) } +// PauseConsumer pauses a consumer. +func (s *stream) PauseConsumer(ctx context.Context, name string, pauseUntil time.Time) (*ConsumerPauseResponse, error) { + return pauseConsumer(ctx, s.jetStream, s.name, name, &pauseUntil) +} + +// ResumeConsumer resumes a consumer. +func (s *stream) ResumeConsumer(ctx context.Context, name string) (*ConsumerPauseResponse, error) { + return resumeConsumer(ctx, s.jetStream, s.name, name) +} + // Info returns StreamInfo from the server. func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error) { ctx, cancel := wrapContextWithoutDeadline(ctx) diff --git a/jetstream/test/consumer_test.go b/jetstream/test/consumer_test.go index bae2ff494..3ad59bb9d 100644 --- a/jetstream/test/consumer_test.go +++ b/jetstream/test/consumer_test.go @@ -64,6 +64,15 @@ func TestConsumerInfo(t *testing.T) { if info.Config.Description != "test consumer" { t.Fatalf("Invalid consumer description; expected: 'test consumer'; got: %s", info.Config.Description) } + if info.Config.PauseUntil != nil { + t.Fatalf("Consumer should not be paused") + } + if info.Paused != false { + t.Fatalf("Consumer should not be paused") + } + if info.PauseRemaining != 0 { + t.Fatalf("Consumer should not be paused") + } // update consumer and see if info is updated _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ diff --git a/jetstream/test/stream_test.go b/jetstream/test/stream_test.go index b7278b39b..632e9b3d2 100644 --- a/jetstream/test/stream_test.go +++ b/jetstream/test/stream_test.go @@ -1483,3 +1483,132 @@ func TestPurgeStream(t *testing.T) { }) } } + +func TestPauseConsumer(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + defer nc.Close() + + s, err := js.CreateStream(context.TODO(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + t.Run("create a paused consumer", func(t *testing.T) { + const consumerName = "durr" + pauseUntil := time.Now().Add(1 * time.Minute) + consumer, err := s.CreateOrUpdateConsumer(context.TODO(), jetstream.ConsumerConfig{ + Durable: consumerName, + AckPolicy: jetstream.AckAllPolicy, + Description: "desc", + PauseUntil: &pauseUntil, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + info, err := consumer.Info(context.TODO()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !info.Paused { + t.Fatalf("Consumer should be paused") + } + if info.PauseRemaining <= time.Duration(0) { + t.Fatalf("PauseRemaining should be greater than 0") + } + }) + + t.Run("pausing a consumer that does not exists", func(t *testing.T) { + const consumerName = "durr1" + pauseUntil := time.Now().Add(1 * time.Minute) + _, err := s.PauseConsumer(context.TODO(), consumerName, pauseUntil) + if err == nil { + t.Fatalf("Expected error; got: %v", err) + } + if !errors.Is(err, jetstream.ErrConsumerNotFound) { + t.Fatalf("Expected error: %v; got: %v", jetstream.ErrConsumerNotFound, err) + } + }) + + t.Run("pausing consumer", func(t *testing.T) { + const consumerName = "durr2" + consumer, err := s.CreateOrUpdateConsumer(context.TODO(), jetstream.ConsumerConfig{ + Durable: consumerName, + AckPolicy: jetstream.AckAllPolicy, + Description: "desc", + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + info, err := consumer.Info(context.TODO()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if info.Paused { + t.Fatalf("Consumer should not be paused") + } + + pauseUntil := time.Now().Add(1 * time.Minute) + resp, err := s.PauseConsumer(context.TODO(), consumerName, pauseUntil) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if !resp.Paused { + t.Fatalf("Consumer should be paused") + } + if !resp.PauseUntil.Equal(pauseUntil) { + t.Fatalf("Invalid pause until; want: %v; got: %v", pauseUntil, resp.PauseUntil) + } + if resp.PauseRemaining <= time.Duration(0) { + t.Fatalf("PauseRemaining should be greater than 0") + } + }) + + t.Run("resuming consumer", func(t *testing.T) { + const consumerName = "durr3" + pauseUntil := time.Now().Add(20 * time.Minute) + consumer, err := s.CreateOrUpdateConsumer(context.TODO(), jetstream.ConsumerConfig{ + Durable: consumerName, + AckPolicy: jetstream.AckAllPolicy, + Description: "desc", + PauseUntil: &pauseUntil, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + info, err := consumer.Info(context.TODO()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !info.Paused { + t.Fatalf("Consumer should be paused") + } + + resp, err := s.ResumeConsumer(context.TODO(), consumerName) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if resp.Paused { + t.Fatalf("Consumer should not be paused") + } + if resp.PauseRemaining != time.Duration(0) { + t.Fatalf("PauseRemaining should be 0") + } + }) +}