Skip to content

Commit

Permalink
feat: add pause and resume jetstream consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Yordis Prieto <[email protected]>
  • Loading branch information
yordis committed Feb 27, 2024
1 parent 5b7ba3d commit e17e2a9
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 23 deletions.
61 changes: 48 additions & 13 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Contributing

Thanks for your interest in contributing! This document contains `nats-io/nats.go` specific contributing details. If you are a first-time contributor, please refer to the general [NATS Contributor Guide](https://nats.io/contributing/) to get a comprehensive overview of contributing to the NATS project.
Thanks for your interest in contributing! This document contains `nats-io/nats.go` specific contributing details. If you
are a first-time contributor, please refer to the general [NATS Contributor Guide](https://nats.io/contributing/) to get
a comprehensive overview of contributing to the NATS project.

## Getting started

Expand All @@ -10,36 +12,69 @@ There are three general ways you can contribute to this repo:
- Reporting a bug or regression
- Contributing changes to the source code

For the first two, refer to the [GitHub Issues](https://github.com/nats-io/nats.go/issues/new/choose) which guides you through the available options along with the needed information to collect.
For the first two, refer to the [GitHub Issues](https://github.com/nats-io/nats.go/issues/new/choose) which guides you
through the available options along with the needed information to collect.

## Contributing changes

_Prior to opening a pull request, it is recommended to open an issue first to ensure the maintainers can review intended changes. Exceptions to this rule include fixing non-functional source such as code comments, documentation or other supporting files._
_Prior to opening a pull request, it is recommended to open an issue first to ensure the maintainers can review intended
changes. Exceptions to this rule include fixing non-functional source such as code comments, documentation or other
supporting files._

Proposing source code changes is done through GitHub's standard pull request workflow.

If your branch is a work-in-progress then please start by creating your pull requests as draft, by clicking the down-arrow next to the `Create pull request` button and instead selecting `Create draft pull request`.
If your branch is a work-in-progress then please start by creating your pull requests as draft, by clicking the
down-arrow next to the `Create pull request` button and instead selecting `Create draft pull request`.

This will defer the automatic process of requesting a review from the NATS team and significantly reduces noise until you are ready. Once you are happy, you can click the `Ready for review` button.
This will defer the automatic process of requesting a review from the NATS team and significantly reduces noise until
you are ready. Once you are happy, you can click the `Ready for review` button.

### Guidelines

A good pull request includes:

- A high-level description of the changes, including links to any issues that are related by adding comments like `Resolves #NNN` to your description. See [Linking a Pull Request to an Issue](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue) for more information.
- An up-to-date parent commit. Please make sure you are pulling in the latest `main` branch and rebasing your work on top of it, i.e. `git rebase main`.
- Unit tests where appropriate. Bug fixes will benefit from the addition of regression tests. New features will not be accepted without suitable test coverage!
- No more commits than necessary. Sometimes having multiple commits is useful for telling a story or isolating changes from one another, but please squash down any unnecessary commits that may just be for clean-up, comments or small changes.
- No additional external dependencies that aren't absolutely essential. Please do everything you can to avoid pulling in additional libraries/dependencies into `go.mod` as we will be very critical of these.
- A high-level description of the changes, including links to any issues that are related by adding comments
like `Resolves #NNN` to your description.
See [Linking a Pull Request to an Issue](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue)
for more information.
- An up-to-date parent commit. Please make sure you are pulling in the latest `main` branch and rebasing your work on
top of it, i.e. `git rebase main`.
- Unit tests where appropriate. Bug fixes will benefit from the addition of regression tests. New features will not be
accepted without suitable test coverage!
- No more commits than necessary. Sometimes having multiple commits is useful for telling a story or isolating changes
from one another, but please squash down any unnecessary commits that may just be for clean-up, comments or small
changes.
- No additional external dependencies that aren't absolutely essential. Please do everything you can to avoid pulling in
additional libraries/dependencies into `go.mod` as we will be very critical of these.

### Sign-off

In order to accept a contribution, you will first need to certify that the contribution is your original work and that you license the work to the project under the [Apache-2.0 license](https://github.com/nats-io/nats.go/blob/main/LICENSE).
In order to accept a contribution, you will first need to certify that the contribution is your original work and that
you license the work to the project under
the [Apache-2.0 license](https://github.com/nats-io/nats.go/blob/main/LICENSE).

This is done by using `Signed-off-by` statements, which should appear in **both** your commit messages and your PR description. Please note that we can only accept sign-offs under a legal name. Nicknames and aliases are not permitted.
This is done by using `Signed-off-by` statements, which should appear in **both** your commit messages and your PR
description. Please note that we can only accept sign-offs under a legal name. Nicknames and aliases are not permitted.

To perform a sign-off with `git`, use `git commit -s` (or `--signoff`).

## Get help

If you have questions about the contribution process, please start a [GitHub discussion](https://github.com/nats-io/nats.go/discussions), join the [NATS Slack](https://slack.nats.io/), or send your question to the [NATS Google Group](https://groups.google.com/forum/#!forum/natsio).
If you have questions about the contribution process, please start
a [GitHub discussion](https://github.com/nats-io/nats.go/discussions), join the [NATS Slack](https://slack.nats.io/), or
send your question to the [NATS Google Group](https://groups.google.com/forum/#!forum/natsio).

## Testing

You should use `go_test.mod` to manage your testing dependencies. Please use the following command to update your
`go.mod` file:

```shell
go mod tidy -modfile=go_test.mod
```

We recommend using the following command to run your tests as well,

```shell
go test ./... -modfile=go_test.mod -v
```
13 changes: 9 additions & 4 deletions go_test.mod
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
module github.com/nats-io/nats.go

go 1.19
go 1.20

require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.6
github.com/nats-io/nats-server/v2 v2.10.11
github.com/klauspost/compress v1.17.7
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240227164423-1d1d982f0538
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
github.com/stretchr/testify v1.8.0
go.uber.org/goleak v1.3.0
golang.org/x/text v0.14.0
google.golang.org/protobuf v1.23.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
github.com/nats-io/jwt/v2 v2.5.5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
27 changes: 21 additions & 6 deletions go_test.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
Expand All @@ -10,20 +13,28 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.11 h1:yKUiLVincZISpo3A4YljJQ+HfLltGAgoNNJl99KL8I0=
github.com/nats-io/nats-server/v2 v2.10.11/go.mod h1:dXtOqVWzbMTEj+tUyC/itXjJhW37xh0tUBrTAlqAfx8=
github.com/nats-io/jwt/v2 v2.5.5 h1:ROfXb50elFq5c9+1ztaUbdlrArNFl2+fQWP6B8HGEq4=
github.com/nats-io/jwt/v2 v2.5.5/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240227164423-1d1d982f0538 h1:z0+WWP9+JS43uYJXUJQApG1HyJLOXzFdXks1eZMDlE0=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240227164423-1d1d982f0538/go.mod h1:J0sPAPoyG5tzqLha88PgAnG4dib7rxHVT/Fka8H6JBQ=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
Expand All @@ -44,4 +55,8 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
3 changes: 3 additions & 0 deletions jetstream/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const (
// apiConsumerDeleteT is used to delete consumers.
apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"

// apiConsumerPauseT is used to pause a consumer.
apiConsumerPauseT = "CONSUMER.PAUSE.%s.%s"

// apiConsumerListT is used to return all detailed consumer information
apiConsumerListT = "CONSUMER.LIST.%s"

Expand Down
37 changes: 37 additions & 0 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/nats-io/nuid"
)
Expand Down Expand Up @@ -317,6 +318,42 @@ func deleteConsumer(ctx context.Context, js *jetStream, stream, consumer string)
return nil
}

func pauseConsumer(ctx context.Context, js *jetStream, stream, consumer string, pauseUntil *time.Time) error {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
if err := validateConsumerName(consumer); err != nil {
return err
}
subject := apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerPauseT, stream, consumer))

var resp consumerPauseResponse
req, err := json.Marshal(consumerPauseRequest{
PauseUntil: pauseUntil,
})
if err != nil {
return err
}
if _, err := js.apiRequestJSON(ctx, subject, &resp, req); err != nil {
return err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
return ErrConsumerNotFound
}
return resp.Error
}
return nil
}

func resumeConsumer(ctx context.Context, js *jetStream, stream, consumer string) error {
// TODO: I do not like that I am using time.Now() here,
// it used to be that I passed `nil` but it did not work as expected
ts := time.Now()
return pauseConsumer(ctx, js, stream, consumer, &ts)
}

func validateConsumerName(dur string) error {
if strings.Contains(dur, ".") {
return fmt.Errorf("%w: %q", ErrInvalidConsumerName, dur)
Expand Down
10 changes: 10 additions & 0 deletions jetstream/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ type (

// TimeStamp indicates when the info was gathered by the server.
TimeStamp time.Time `json:"ts"`

// Paused indicates whether the consumer is paused.
Paused bool `json:"paused,omitempty"`

// PauseRemaining contains the amount of time left until the consumer
// unpauses. It will only be non-zero if the consumer is currently paused.
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}

// ConsumerConfig is the configuration of a JetStream consumer.
Expand Down Expand Up @@ -217,6 +224,9 @@ type (
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil *time.Time `json:"pause_until,omitempty"`
}

// OrderedConsumerConfig is the configuration of an ordered JetStream
Expand Down
20 changes: 20 additions & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ type (
// DeleteConsumer removes a consumer with given name from a stream.
// If consumer does not exist, ErrConsumerNotFound is returned.
DeleteConsumer(ctx context.Context, stream string, consumer string) error

// PauseConsumer pauses a stream. If pauseUntil is provided, the stream will be paused until that time.
PauseConsumer(ctx context.Context, stream string, consumer string, pauseUntil *time.Time) error

// ResumeConsumer resumes a paused stream.
ResumeConsumer(ctx context.Context, stream string, consumer string) error
}

// StreamListOpt is a functional option for [StreamManager.ListStreams] and
Expand Down Expand Up @@ -769,6 +775,20 @@ func (js *jetStream) DeleteConsumer(ctx context.Context, stream string, name str
return deleteConsumer(ctx, js, stream, name)
}

func (js *jetStream) PauseConsumer(ctx context.Context, stream string, consumer string, pauseUntil *time.Time) error {
if err := validateStreamName(stream); err != nil {
return err
}
return pauseConsumer(ctx, js, stream, consumer, pauseUntil)
}

func (js *jetStream) ResumeConsumer(ctx context.Context, stream string, consumer string) error {
if err := validateStreamName(stream); err != nil {
return err
}
return resumeConsumer(ctx, js, stream, consumer)
}

func validateStreamName(stream string) error {
if stream == "" {
return ErrStreamNameRequired
Expand Down
27 changes: 27 additions & 0 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type (
// If consumer does not exist, ErrConsumerNotFound is returned.
DeleteConsumer(ctx context.Context, consumer string) error

// PauseConsumer pauses a consumer.
PauseConsumer(ctx context.Context, consumer string, pauseUntil *time.Time) error

// ResumeConsumer resumes a consumer.
ResumeConsumer(ctx context.Context, consumer string) error

// ListConsumers returns ConsumerInfoLister enabling iterating over a
// channel of consumer infos.
ListConsumers(context.Context) ConsumerInfoLister
Expand Down Expand Up @@ -163,6 +169,17 @@ type (
Success bool `json:"success,omitempty"`
}

consumerPauseRequest struct {
PauseUntil *time.Time `json:"pause_until,omitempty"`
}

consumerPauseResponse struct {
apiResponse
Paused bool `json:"paused"`
PauseUntil time.Time `json:"pause_until"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}

// GetMsgOpt is a function setting options for [Stream.GetMsg]
GetMsgOpt func(*apiMsgGetRequest) error

Expand Down Expand Up @@ -297,6 +314,16 @@ func (s *stream) DeleteConsumer(ctx context.Context, name string) error {
return deleteConsumer(ctx, s.jetStream, s.name, name)
}

// PauseConsumer pauses a consumer.
func (s *stream) PauseConsumer(ctx context.Context, name string, pauseUntil *time.Time) error {
return pauseConsumer(ctx, s.jetStream, s.name, name, pauseUntil)
}

// ResumeConsumer resumes a consumer.
func (s *stream) ResumeConsumer(ctx context.Context, name string) error {
return resumeConsumer(ctx, s.jetStream, s.name, name)
}

// Info returns StreamInfo from the server.
func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
Expand Down
9 changes: 9 additions & 0 deletions jetstream/test/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ func TestConsumerInfo(t *testing.T) {
if info.Config.Description != "test consumer" {
t.Fatalf("Invalid consumer description; expected: 'test consumer'; got: %s", info.Config.Description)
}
if info.Config.PauseUntil != nil {
t.Fatalf("Consumer should not be paused")
}
if info.Paused != false {
t.Fatalf("Consumer should not be paused")
}
if info.PauseRemaining != 0 {
t.Fatalf("Consumer should not be paused")
}

// update consumer and see if info is updated
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Expand Down
Loading

0 comments on commit e17e2a9

Please sign in to comment.