Skip to content

Commit

Permalink
feat: add pause and resume jetstream consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Yordis Prieto <[email protected]>
  • Loading branch information
yordis committed Feb 28, 2024
1 parent a04478f commit 5e31e66
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 10 deletions.
8 changes: 4 additions & 4 deletions go_test.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/nats-io/nats.go

go 1.19
go 1.20

require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.6
github.com/nats-io/nats-server/v2 v2.10.11
github.com/klauspost/compress v1.17.7
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240227164423-1d1d982f0538
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.3.0
Expand All @@ -15,7 +15,7 @@ require (

require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
github.com/nats-io/jwt/v2 v2.5.5 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.11 h1:yKUiLVincZISpo3A4YljJQ+HfLltGAgoNNJl99KL8I0=
github.com/nats-io/nats-server/v2 v2.10.11/go.mod h1:dXtOqVWzbMTEj+tUyC/itXjJhW37xh0tUBrTAlqAfx8=
github.com/nats-io/jwt/v2 v2.5.5 h1:ROfXb50elFq5c9+1ztaUbdlrArNFl2+fQWP6B8HGEq4=
github.com/nats-io/jwt/v2 v2.5.5/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240227164423-1d1d982f0538 h1:z0+WWP9+JS43uYJXUJQApG1HyJLOXzFdXks1eZMDlE0=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240227164423-1d1d982f0538/go.mod h1:J0sPAPoyG5tzqLha88PgAnG4dib7rxHVT/Fka8H6JBQ=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
3 changes: 3 additions & 0 deletions jetstream/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const (
// apiConsumerDeleteT is used to delete consumers.
apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"

// apiConsumerPauseT is used to pause a consumer.
apiConsumerPauseT = "CONSUMER.PAUSE.%s.%s"

// apiConsumerListT is used to return all detailed consumer information
apiConsumerListT = "CONSUMER.LIST.%s"

Expand Down
34 changes: 34 additions & 0 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/nats-io/nuid"
)
Expand Down Expand Up @@ -317,6 +318,39 @@ func deleteConsumer(ctx context.Context, js *jetStream, stream, consumer string)
return nil
}

func pauseConsumer(ctx context.Context, js *jetStream, stream, consumer string, pauseUntil *time.Time) error {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
if err := validateConsumerName(consumer); err != nil {
return err
}
subject := apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerPauseT, stream, consumer))

var resp consumerPauseResponse
req, err := json.Marshal(consumerPauseRequest{
PauseUntil: pauseUntil,
})
if err != nil {
return err
}
if _, err := js.apiRequestJSON(ctx, subject, &resp, req); err != nil {
return err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
return ErrConsumerNotFound
}
return resp.Error
}
return nil
}

func resumeConsumer(ctx context.Context, js *jetStream, stream, consumer string) error {
return pauseConsumer(ctx, js, stream, consumer, nil)
}

func validateConsumerName(dur string) error {
if strings.Contains(dur, ".") {
return fmt.Errorf("%w: %q", ErrInvalidConsumerName, dur)
Expand Down
10 changes: 10 additions & 0 deletions jetstream/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ type (

// TimeStamp indicates when the info was gathered by the server.
TimeStamp time.Time `json:"ts"`

// Paused indicates whether the consumer is paused.
Paused bool `json:"paused,omitempty"`

// PauseRemaining contains the amount of time left until the consumer
// unpauses. It will only be non-zero if the consumer is currently paused.
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}

// ConsumerConfig is the configuration of a JetStream consumer.
Expand Down Expand Up @@ -217,6 +224,9 @@ type (
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil *time.Time `json:"pause_until,omitempty"`
}

// OrderedConsumerConfig is the configuration of an ordered JetStream
Expand Down
20 changes: 20 additions & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ type (
// DeleteConsumer removes a consumer with given name from a stream.
// If consumer does not exist, ErrConsumerNotFound is returned.
DeleteConsumer(ctx context.Context, stream string, consumer string) error

// PauseConsumer pauses a consumer until the given time.
PauseConsumer(ctx context.Context, stream string, consumer string, pauseUntil time.Time) error

// ResumeConsumer resumes a paused consumer.
ResumeConsumer(ctx context.Context, stream string, consumer string) error
}

// StreamListOpt is a functional option for [StreamManager.ListStreams] and
Expand Down Expand Up @@ -769,6 +775,20 @@ func (js *jetStream) DeleteConsumer(ctx context.Context, stream string, name str
return deleteConsumer(ctx, js, stream, name)
}

func (js *jetStream) PauseConsumer(ctx context.Context, stream string, consumer string, pauseUntil time.Time) error {
if err := validateStreamName(stream); err != nil {
return err
}
return pauseConsumer(ctx, js, stream, consumer, &pauseUntil)
}

func (js *jetStream) ResumeConsumer(ctx context.Context, stream string, consumer string) error {
if err := validateStreamName(stream); err != nil {
return err
}
return resumeConsumer(ctx, js, stream, consumer)
}

func validateStreamName(stream string) error {
if stream == "" {
return ErrStreamNameRequired
Expand Down
27 changes: 27 additions & 0 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type (
// If consumer does not exist, ErrConsumerNotFound is returned.
DeleteConsumer(ctx context.Context, consumer string) error

// PauseConsumer pauses a consumer.
PauseConsumer(ctx context.Context, consumer string, pauseUntil time.Time) error

// ResumeConsumer resumes a consumer.
ResumeConsumer(ctx context.Context, consumer string) error

// ListConsumers returns ConsumerInfoLister enabling iterating over a
// channel of consumer infos.
ListConsumers(context.Context) ConsumerInfoLister
Expand Down Expand Up @@ -163,6 +169,17 @@ type (
Success bool `json:"success,omitempty"`
}

consumerPauseRequest struct {
PauseUntil *time.Time `json:"pause_until,omitempty"`
}

consumerPauseResponse struct {
apiResponse
Paused bool `json:"paused"`
PauseUntil time.Time `json:"pause_until"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}

// GetMsgOpt is a function setting options for [Stream.GetMsg]
GetMsgOpt func(*apiMsgGetRequest) error

Expand Down Expand Up @@ -297,6 +314,16 @@ func (s *stream) DeleteConsumer(ctx context.Context, name string) error {
return deleteConsumer(ctx, s.jetStream, s.name, name)
}

// PauseConsumer pauses a consumer.
func (s *stream) PauseConsumer(ctx context.Context, name string, pauseUntil time.Time) error {
return pauseConsumer(ctx, s.jetStream, s.name, name, &pauseUntil)
}

// ResumeConsumer resumes a consumer.
func (s *stream) ResumeConsumer(ctx context.Context, name string) error {
return resumeConsumer(ctx, s.jetStream, s.name, name)
}

// Info returns StreamInfo from the server.
func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
Expand Down
9 changes: 9 additions & 0 deletions jetstream/test/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ func TestConsumerInfo(t *testing.T) {
if info.Config.Description != "test consumer" {
t.Fatalf("Invalid consumer description; expected: 'test consumer'; got: %s", info.Config.Description)
}
if info.Config.PauseUntil != nil {
t.Fatalf("Consumer should not be paused")
}
if info.Paused != false {
t.Fatalf("Consumer should not be paused")
}
if info.PauseRemaining != 0 {
t.Fatalf("Consumer should not be paused")
}

// update consumer and see if info is updated
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Expand Down
135 changes: 135 additions & 0 deletions jetstream/test/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1473,3 +1473,138 @@ func TestPurgeStream(t *testing.T) {
})
}
}

func TestPauseConsumer(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)

nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

defer nc.Close()

s, err := js.CreateStream(context.TODO(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

t.Run("create a paused consumer", func(t *testing.T) {
const consumerName = "durr"
pauseUntil := time.Now().Add(1 * time.Minute)
consumer, err := s.CreateOrUpdateConsumer(context.TODO(), jetstream.ConsumerConfig{
Durable: consumerName,
AckPolicy: jetstream.AckAllPolicy,
Description: "desc",
PauseUntil: &pauseUntil,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

info, err := consumer.Info(context.TODO())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !info.Paused {
t.Fatalf("Consumer should be paused")
}
if info.PauseRemaining <= time.Duration(0) {
t.Fatalf("PauseRemaining should be greater than 0")
}
})

t.Run("pausing a consumer that does not exists", func(t *testing.T) {
const consumerName = "durr1"
pauseUntil := time.Now().Add(1 * time.Minute)
err := s.PauseConsumer(context.TODO(), consumerName, pauseUntil)
if err == nil {
t.Fatalf("Expected error; got: %v", err)
}
if !errors.Is(err, jetstream.ErrConsumerNotFound) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrConsumerNotFound, err)
}
})

t.Run("pausing consumer", func(t *testing.T) {
const consumerName = "durr2"
consumer, err := s.CreateOrUpdateConsumer(context.TODO(), jetstream.ConsumerConfig{
Durable: consumerName,
AckPolicy: jetstream.AckAllPolicy,
Description: "desc",
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

info, err := consumer.Info(context.TODO())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if info.Paused {
t.Fatalf("Consumer should not be paused")
}

pauseUntil := time.Now().Add(1 * time.Minute)
err = s.PauseConsumer(context.TODO(), consumerName, pauseUntil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

info, err = consumer.Info(context.TODO())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if !info.Paused {
t.Fatalf("Consumer should be paused")
}
if info.PauseRemaining <= time.Duration(0) {
t.Fatalf("PauseRemaining should be greater than 0")
}
})

t.Run("resuming consumer", func(t *testing.T) {
const consumerName = "durr3"
pauseUntil := time.Now().Add(20 * time.Minute)
consumer, err := s.CreateOrUpdateConsumer(context.TODO(), jetstream.ConsumerConfig{
Durable: consumerName,
AckPolicy: jetstream.AckAllPolicy,
Description: "desc",
PauseUntil: &pauseUntil,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

info, err := consumer.Info(context.TODO())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !info.Paused {
t.Fatalf("Consumer should be paused")
}

err = s.ResumeConsumer(context.TODO(), consumerName)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

info, err = consumer.Info(context.TODO())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if info.Paused {
t.Fatalf("Consumer should not be paused")
}
if info.PauseRemaining != time.Duration(0) {
t.Fatalf("PauseRemaining should be 0")
}
})
}

0 comments on commit 5e31e66

Please sign in to comment.