Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds the ability to pause and resume consumers #522

Merged
merged 1 commit into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions api/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
JSApiRequestNext = "$JS.API.CONSUMER.MSG.NEXT.*.*"
JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
JSApiconsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
JSMetricConsumerAckPre = JSMetricPrefix + ".CONSUMER.ACK"
JSAdvisoryConsumerMaxDeliveryExceedPre = JSAdvisoryPrefix + ".CONSUMER.MAX_DELIVERIES"
)
Expand Down Expand Up @@ -150,6 +151,19 @@ type JSApiConsumerLeaderStepDownResponse struct {
Success bool `json:"success,omitempty"`
}

// io.nats.jetstream.api.v1.consumer_pause_request
type JSApiConsumerPauseRequest struct {
PauseUntil time.Time `json:"pause_until,omitempty"`
}

// io.nats.jetstream.api.v1.consumer_pause_response
type JSApiConsumerPauseResponse struct {
JSApiResponse
Paused bool `json:"paused"`
PauseUntil time.Time `json:"pause_until"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}

type AckPolicy int

const (
Expand Down Expand Up @@ -352,6 +366,9 @@ type ConsumerConfig struct {
// Metadata is additional metadata for the Consumer.
Metadata map[string]string `json:"metadata,omitempty"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil time.Time `json:"pause_until,omitempty"`

// Don't add to general clients.
Direct bool `json:"direct,omitempty"`
}
Expand All @@ -377,6 +394,8 @@ type ConsumerInfo struct {
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
Paused bool `json:"paused,omitempty"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
TimeStamp time.Time `json:"ts"`
}

Expand Down
3 changes: 3 additions & 0 deletions api/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func main() {
&schema{P: "jetstream/advisory/v1/terminated.json", St: "jsadvisory.JSConsumerDeliveryTerminatedAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/stream_action.json", St: "jsadvisory.JSStreamActionAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/consumer_action.json", St: "jsadvisory.JSConsumerActionAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/consumer_pause.json", St: "jsadvisory.JSConsumerPauseAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/snapshot_create.json", St: "jsadvisory.JSSnapshotCreateAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/snapshot_complete.json", St: "jsadvisory.JSSnapshotCompleteAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/restore_create.json", St: "jsadvisory.JSRestoreCreateAdvisoryV1"},
Expand All @@ -223,6 +224,8 @@ func main() {
&schema{P: "jetstream/api/v1/consumer_names_response.json", St: "JSApiConsumerNamesResponse"},
&schema{P: "jetstream/api/v1/consumer_getnext_request.json", St: "JSApiConsumerGetNextRequest"},
&schema{P: "jetstream/api/v1/consumer_leader_stepdown_response.json", St: "JSApiConsumerLeaderStepDownResponse"},
&schema{P: "jetstream/api/v1/consumer_pause_request.json", St: "JSApiConsumerPauseRequest"},
&schema{P: "jetstream/api/v1/consumer_pause_response.json", St: "JSApiConsumerPauseResponse"},
&schema{P: "jetstream/api/v1/stream_create_request.json", St: "JSApiStreamCreateRequest"},
&schema{P: "jetstream/api/v1/stream_create_response.json", St: "JSApiStreamCreateResponse"},
&schema{P: "jetstream/api/v1/stream_delete_response.json", St: "JSApiStreamDeleteResponse"},
Expand Down
42 changes: 42 additions & 0 deletions api/jetstream/advisory/consumer_pause.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package advisory

import (
"time"

"github.com/nats-io/jsm.go/api/event"
)

// JSConsumerPauseAdvisoryV1 indicates that a consumer was paused or unpaused
type JSConsumerPauseAdvisoryV1 struct {
event.NATSEvent

Stream string `json:"stream"`
Consumer string `json:"consumer"`
Paused bool `json:"paused"`
PauseUntil time.Time `json:"pause_until,omitempty"`
Domain string `json:"domain,omitempty"`
}

func init() {
err := event.RegisterTextCompactTemplate("io.nats.jetstream.advisory.v1.consumer_pause", `{{ .Time | ShortTime }} [Consumer Pause] Consumer: {{ .Stream }} > {{ .Consumer }} Paused: {{ .Paused }}{{ if .Paused }} until {{ .PauseUntil }}{{ end }}`)
if err != nil {
panic(err)
}

err = event.RegisterTextExtendedTemplate("io.nats.jetstream.advisory.v1.consumer_pause", `
[{{ .Time | ShortTime }}] [{{ .ID }}] Consumer Pause

Stream: {{ .Stream }}
Consumer: {{ .Consumer }}
Paused: {{ .Paused }}
{{- if .Paused }}
Until: {{ .PauseUntil }}
{{- end }}
{{- if .Domain }}
Domain: {{ .Domain }}
{{- end }}
`)
if err != nil {
panic(err)
}
}
61 changes: 60 additions & 1 deletion api/schemas_generated.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// auto generated 2024-02-08 10:37:12.960275 +0100 CET m=+0.008558457
// auto generated 2024-02-13 15:58:31.227879 +0100 CET m=+0.013734209

package api

Expand All @@ -22,6 +22,7 @@ var schemaTypes = map[string]func() any{
"io.nats.jetstream.advisory.v1.terminated": func() any { return &jsadvisory.JSConsumerDeliveryTerminatedAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.stream_action": func() any { return &jsadvisory.JSStreamActionAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.consumer_action": func() any { return &jsadvisory.JSConsumerActionAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.consumer_pause": func() any { return &jsadvisory.JSConsumerPauseAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.snapshot_create": func() any { return &jsadvisory.JSSnapshotCreateAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.snapshot_complete": func() any { return &jsadvisory.JSSnapshotCompleteAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.restore_create": func() any { return &jsadvisory.JSRestoreCreateAdvisoryV1{} },
Expand All @@ -48,6 +49,8 @@ var schemaTypes = map[string]func() any{
"io.nats.jetstream.api.v1.consumer_names_response": func() any { return &JSApiConsumerNamesResponse{} },
"io.nats.jetstream.api.v1.consumer_getnext_request": func() any { return &JSApiConsumerGetNextRequest{} },
"io.nats.jetstream.api.v1.consumer_leader_stepdown_response": func() any { return &JSApiConsumerLeaderStepDownResponse{} },
"io.nats.jetstream.api.v1.consumer_pause_request": func() any { return &JSApiConsumerPauseRequest{} },
"io.nats.jetstream.api.v1.consumer_pause_response": func() any { return &JSApiConsumerPauseResponse{} },
"io.nats.jetstream.api.v1.stream_create_request": func() any { return &JSApiStreamCreateRequest{} },
"io.nats.jetstream.api.v1.stream_create_response": func() any { return &JSApiStreamCreateResponse{} },
"io.nats.jetstream.api.v1.stream_delete_response": func() any { return &JSApiStreamDeleteResponse{} },
Expand Down Expand Up @@ -480,6 +483,62 @@ func (t JSApiConsumerLeaderStepDownResponse) Schema() ([]byte, error) {
return scfs.Load(f)
}

// Validate performs a JSON Schema validation of the configuration
func (t JSApiConsumerPauseRequest) Validate(v ...StructValidator) (valid bool, errors []string) {
if len(v) == 0 || v[0] == nil {
return true, nil
}

return v[0].ValidateStruct(t, t.SchemaType())
}

// SchemaType is the NATS schema type io.nats.jetstream.api.v1.consumer_pause_request
func (t JSApiConsumerPauseRequest) SchemaType() string {
return "io.nats.jetstream.api.v1.consumer_pause_request"
}

// SchemaID is the url to the JSON Schema for JetStream Consumer Configuration
func (t JSApiConsumerPauseRequest) SchemaID() string {
return "https://raw.githubusercontent.com/nats-io/jsm.go/master/schemas/jetstream/api/v1/consumer_pause_request.json"
}

// Schema is a JSON Schema document for the JetStream Consumer Configuration
func (t JSApiConsumerPauseRequest) Schema() ([]byte, error) {
f, err := SchemaFileForType(t.SchemaType())
if err != nil {
return nil, err
}
return scfs.Load(f)
}

// Validate performs a JSON Schema validation of the configuration
func (t JSApiConsumerPauseResponse) Validate(v ...StructValidator) (valid bool, errors []string) {
if len(v) == 0 || v[0] == nil {
return true, nil
}

return v[0].ValidateStruct(t, t.SchemaType())
}

// SchemaType is the NATS schema type io.nats.jetstream.api.v1.consumer_pause_response
func (t JSApiConsumerPauseResponse) SchemaType() string {
return "io.nats.jetstream.api.v1.consumer_pause_response"
}

// SchemaID is the url to the JSON Schema for JetStream Consumer Configuration
func (t JSApiConsumerPauseResponse) SchemaID() string {
return "https://raw.githubusercontent.com/nats-io/jsm.go/master/schemas/jetstream/api/v1/consumer_pause_response.json"
}

// Schema is a JSON Schema document for the JetStream Consumer Configuration
func (t JSApiConsumerPauseResponse) Schema() ([]byte, error) {
f, err := SchemaFileForType(t.SchemaType())
if err != nil {
return nil, err
}
return scfs.Load(f)
}

// Validate performs a JSON Schema validation of the configuration
func (t JSApiStreamCreateRequest) Validate(v ...StructValidator) (valid bool, errors []string) {
if len(v) == 0 || v[0] == nil {
Expand Down
45 changes: 45 additions & 0 deletions consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,13 @@ func ConsumerMetadata(meta map[string]string) ConsumerOption {
}
}

func PauseUntil(deadline time.Time) ConsumerOption {
return func(o *api.ConsumerConfig) error {
o.PauseUntil = deadline
return nil
}
}

// UpdateConfiguration updates the consumer configuration
// At present the description, ack wait, max deliver, sample frequency, max ack pending, max waiting and header only settings can be changed
func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error {
Expand Down Expand Up @@ -888,6 +895,43 @@ func (c *Consumer) LeaderStepDown() error {
return nil
}

// Pause requests a consumer be paused until the deadline, if it fails to pause an error is returned.
//
// A common reason for failures is when a time is supplied that is in the past from the perspective of the server
func (c *Consumer) Pause(deadline time.Time) (*api.JSApiConsumerPauseResponse, error) {
var resp *api.JSApiConsumerPauseResponse
req := api.JSApiConsumerPauseRequest{
PauseUntil: deadline,
}

err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiconsumerPauseT, c.StreamName(), c.Name()), &req, &resp)
if err != nil {
return nil, err
}

if !resp.Paused {
return nil, fmt.Errorf("pause request failed, perhaps due to a time in the past")
}

return resp, nil
}

// Resume requests the server resumes a paused consumer
func (c *Consumer) Resume() error {
var resp *api.JSApiConsumerPauseResponse

err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiconsumerPauseT, c.StreamName(), c.Name()), nil, &resp)
if err != nil {
return err
}

if resp.Paused {
return fmt.Errorf("pause request failed for an unknown reason")
}

return nil
}

func (c *Consumer) Name() string { return c.name }
func (c *Consumer) IsSampled() bool { return c.SampleFrequency() != "" }
func (c *Consumer) IsPullMode() bool { return c.cfg.DeliverSubject == "" }
Expand Down Expand Up @@ -922,6 +966,7 @@ func (c *Consumer) InactiveThreshold() time.Duration { return c.cfg.InactiveThre
func (c *Consumer) Replicas() int { return c.cfg.Replicas }
func (c *Consumer) Metadata() map[string]string { return c.cfg.Metadata }
func (c *Consumer) MemoryStorage() bool { return c.cfg.MemoryStorage }
func (c *Consumer) PauseUntil() time.Time { return c.cfg.PauseUntil }
func (c *Consumer) StartTime() time.Time {
if c.cfg.OptStartTime == nil {
return time.Time{}
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/expr-lang/expr v1.15.8
github.com/google/go-cmp v0.6.0
github.com/klauspost/compress v1.17.5
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88
github.com/nats-io/nats.go v1.32.0
github.com/klauspost/compress v1.17.6
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba
github.com/nats-io/nats.go v1.33.1
github.com/nats-io/nuid v1.0.1
golang.org/x/net v0.20.0
golang.org/x/text v0.14.0
Expand All @@ -18,10 +18,10 @@ require (
require (
github.com/kr/pretty v0.1.0 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
github.com/nats-io/jwt/v2 v2.5.4 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,34 @@ github.com/expr-lang/expr v1.15.8 h1:FL8+d3rSSP4tmK9o+vKfSMqqpGL8n15pEPiHcnBpxoI
github.com/expr-lang/expr v1.15.8/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/klauspost/compress v1.17.5 h1:d4vBd+7CHydUqpFBgUEKkSdtSugf9YFmSkvUYPquI5E=
github.com/klauspost/compress v1.17.5/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88 h1:mQUXBh1zwlTogpLmb3F8wJC/OrJlgQ2j76LD1BVHp64=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88/go.mod h1:/TE61Dos8NlwZnjzyE3ZlOnM6dgl7tf937dnf4VclrA=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/jwt/v2 v2.5.4 h1:Bz+drKl2GbE30fxTOtb0NYl1BQ5RwZ+Zcqkg3mR5bbI=
github.com/nats-io/jwt/v2 v2.5.4/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba h1:idIfFiRzXv2wHFpxHH4nSoPtg7UVr4vQPB1NraU0D4c=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba/go.mod h1:Co2t9J1pk4WXyMZiNFkLjFiD7hKE/jjsXtDWCyLfcgw=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
Expand Down
6 changes: 5 additions & 1 deletion natscontext/testdata/nats/context/user_pass_token_creds.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@
"jetstream_event_prefix": "",
"inbox_prefix": "",
"user_jwt": "",
"color_scheme": ""
"color_scheme": "",
"tls_first": false,
"windows_cert_store": "",
"windows_cert_match_by": "",
"windows_cert_match": ""
}
50 changes: 50 additions & 0 deletions schema_source/jetstream/advisory/v1/consumer_pause.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://nats.io/schemas/jetstream/advisory/v1/consumer_pause.json",
"description": "An Advisory sent when consumer is paused or resumed",
"title": "io.nats.jetstream.advisory.v1.consumer_pause",
"type": "object",
"required": [
"type",
"id",
"timestamp",
"stream",
"consumer",
"paused"
],
"additionalProperties": false,
"properties": {
"type": {
"type": "string",
"const": "io.nats.jetstream.advisory.v1.consumer_pause"
},
"id": {
"type": "string",
"description": "Unique correlation ID for this event"
},
"timestamp": {
"type": "string",
"description": "The time this event was created in RFC3339 format"
},
"stream": {
"type": "string",
"description": "The name of the Stream the Consumer belongs to"
},
"consumer": {
"type": "string",
"description": "The name of the Consumer that elected a new leader"
},
"paused": {
"type": "boolean",
"description": "Indicates the consumer is paused"
},
"pause_until": {
"description": "When paused the time the consumer will be unpaused, RFC3339 format",
"type": "string"
},
"domain": {
"description": "The domain hosting the Stream and Consumer if configured",
"type": "string"
}
}
}
Loading
Loading