Skip to content

Commit

Permalink
feat: add pause and resume jetstream consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Yordis Prieto <[email protected]>
  • Loading branch information
yordis committed Feb 27, 2024
1 parent 5b7ba3d commit 481ab36
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 15 deletions.
13 changes: 10 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@ module github.com/nats-io/nats.go
go 1.20

require (
github.com/klauspost/compress v1.17.2
github.com/golang/protobuf v1.5.3
github.com/klauspost/compress v1.17.6
github.com/nats-io/nats-server/v2 v2.10.11
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.3.0
golang.org/x/text v0.14.0
google.golang.org/protobuf v1.32.0
)

require (
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
)
44 changes: 32 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.11 h1:yKUiLVincZISpo3A4YljJQ+HfLltGAgoNNJl99KL8I0=
github.com/nats-io/nats-server/v2 v2.10.11/go.mod h1:dXtOqVWzbMTEj+tUyC/itXjJhW37xh0tUBrTAlqAfx8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
3 changes: 3 additions & 0 deletions jetstream/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const (
// apiConsumerDeleteT is used to delete consumers.
apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"

// apiConsumerPauseT is used to pause a consumer.
apiConsumerPauseT = "CONSUMER.PAUSE.%s.%s"

// apiConsumerListT is used to return all detailed consumer information
apiConsumerListT = "CONSUMER.LIST.%s"

Expand Down
34 changes: 34 additions & 0 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/nats-io/nuid"
)
Expand Down Expand Up @@ -317,6 +318,39 @@ func deleteConsumer(ctx context.Context, js *jetStream, stream, consumer string)
return nil
}

func pauseConsumer(ctx context.Context, js *jetStream, stream, consumer string, pauseUntil *time.Time) error {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
if err := validateConsumerName(consumer); err != nil {
return err
}
subject := apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerPauseT, stream, consumer))

var resp consumerPauseResponse
req, err := json.Marshal(consumerPauseRequest{
PauseUntil: pauseUntil,
})
if err != nil {
return err
}
if _, err := js.apiRequestJSON(ctx, subject, &resp, req); err != nil {
return err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
return ErrConsumerNotFound
}
return resp.Error
}
return nil
}

func resumeConsumer(ctx context.Context, js *jetStream, stream, consumer string) error {
return pauseConsumer(ctx, js, stream, consumer, nil)
}

func validateConsumerName(dur string) error {
if strings.Contains(dur, ".") {
return fmt.Errorf("%w: %q", ErrInvalidConsumerName, dur)
Expand Down
10 changes: 10 additions & 0 deletions jetstream/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ type (

// TimeStamp indicates when the info was gathered by the server.
TimeStamp time.Time `json:"ts"`

// Paused indicates whether the consumer is paused.
Paused bool `json:"paused,omitempty"`

// PauseRemaining contains the amount of time left until the consumer
// unpauses. It will only be non-zero if the consumer is currently paused.
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}

// ConsumerConfig is the configuration of a JetStream consumer.
Expand Down Expand Up @@ -217,6 +224,9 @@ type (
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil *time.Time `json:"pause_until,omitempty"`
}

// OrderedConsumerConfig is the configuration of an ordered JetStream
Expand Down
20 changes: 20 additions & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ type (
// DeleteConsumer removes a consumer with given name from a stream.
// If consumer does not exist, ErrConsumerNotFound is returned.
DeleteConsumer(ctx context.Context, stream string, consumer string) error

// PauseConsumer pauses a stream. If pauseUntil is provided, the stream will be paused until that time.
PauseConsumer(ctx context.Context, stream string, consumer string, pauseUntil *time.Time) error

// ResumeConsumer resumes a paused stream.
ResumeConsumer(ctx context.Context, stream string, consumer string) error
}

// StreamListOpt is a functional option for [StreamManager.ListStreams] and
Expand Down Expand Up @@ -769,6 +775,20 @@ func (js *jetStream) DeleteConsumer(ctx context.Context, stream string, name str
return deleteConsumer(ctx, js, stream, name)
}

func (js *jetStream) PauseConsumer(ctx context.Context, stream string, consumer string, pauseUntil *time.Time) error {
if err := validateStreamName(stream); err != nil {
return err
}
return pauseConsumer(ctx, js, stream, consumer, pauseUntil)
}

func (js *jetStream) ResumeConsumer(ctx context.Context, stream string, consumer string) error {
if err := validateStreamName(stream); err != nil {
return err
}
return resumeConsumer(ctx, js, stream, consumer)
}

func validateStreamName(stream string) error {
if stream == "" {
return ErrStreamNameRequired
Expand Down
27 changes: 27 additions & 0 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type (
// If consumer does not exist, ErrConsumerNotFound is returned.
DeleteConsumer(ctx context.Context, consumer string) error

// PauseConsumer pauses a consumer.
PauseConsumer(ctx context.Context, consumer string, pauseUntil *time.Time) error

// ResumeConsumer resumes a consumer.
ResumeConsumer(ctx context.Context, consumer string) error

// ListConsumers returns ConsumerInfoLister enabling iterating over a
// channel of consumer infos.
ListConsumers(context.Context) ConsumerInfoLister
Expand Down Expand Up @@ -163,6 +169,17 @@ type (
Success bool `json:"success,omitempty"`
}

consumerPauseRequest struct {
PauseUntil *time.Time `json:"pause_until,omitempty"`
}

consumerPauseResponse struct {
apiResponse
Paused bool `json:"paused"`
PauseUntil time.Time `json:"pause_until"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}

// GetMsgOpt is a function setting options for [Stream.GetMsg]
GetMsgOpt func(*apiMsgGetRequest) error

Expand Down Expand Up @@ -297,6 +314,16 @@ func (s *stream) DeleteConsumer(ctx context.Context, name string) error {
return deleteConsumer(ctx, s.jetStream, s.name, name)
}

// PauseConsumer pauses a consumer.
func (s *stream) PauseConsumer(ctx context.Context, name string, pauseUntil *time.Time) error {
return pauseConsumer(ctx, s.jetStream, s.name, name, pauseUntil)
}

// ResumeConsumer resumes a consumer.
func (s *stream) ResumeConsumer(ctx context.Context, name string) error {
return resumeConsumer(ctx, s.jetStream, s.name, name)
}

// Info returns StreamInfo from the server.
func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
Expand Down
6 changes: 6 additions & 0 deletions jetstream/test/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func TestConsumerInfo(t *testing.T) {
if info.Config.Description != "test consumer" {
t.Fatalf("Invalid consumer description; expected: 'test consumer'; got: %s", info.Config.Description)
}
if info.Paused != false {
t.Fatalf("Consumer should not be paused")
}
if info.PauseRemaining != 0 {
t.Fatalf("Consumer should not be paused")
}

// update consumer and see if info is updated
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Expand Down

0 comments on commit 481ab36

Please sign in to comment.