diff --git a/src/examples/java/io/nats/examples/jetstream/NatsJsManageConsumers.java b/src/examples/java/io/nats/examples/jetstream/NatsJsManageConsumers.java index 49eea97a3..f83662fff 100644 --- a/src/examples/java/io/nats/examples/jetstream/NatsJsManageConsumers.java +++ b/src/examples/java/io/nats/examples/jetstream/NatsJsManageConsumers.java @@ -19,11 +19,14 @@ import io.nats.client.Nats; import io.nats.client.api.ConsumerConfiguration; import io.nats.client.api.ConsumerInfo; +import io.nats.client.api.ConsumerPauseResponse; import io.nats.examples.ExampleArgs; import io.nats.examples.ExampleUtils; +import java.time.ZonedDateTime; import java.util.List; +import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT; import static io.nats.examples.jetstream.NatsJsUtils.*; /** @@ -82,16 +85,30 @@ public static void main(String[] args) { List consumerNames = jsm.getConsumerNames(exArgs.stream); printObject(consumerNames); - // 4. Delete a consumer, then list them again + // 4. Pause a consumer + System.out.println("\n----------\n4. pauseConsumer"); + ZonedDateTime pauseUntil = ZonedDateTime.now(ZONE_ID_GMT).plusSeconds(30); + ConsumerPauseResponse pauseResponse = jsm.pauseConsumer(exArgs.stream, durable1, pauseUntil); + printObject(pauseResponse); + ci = jsm.getConsumerInfo(exArgs.stream, durable1); + printObject(ci); + + // 5. Resume a (paused) consumer + System.out.println("\n----------\n5. resumeConsumer"); + jsm.resumeConsumer(exArgs.stream, durable1); + ci = jsm.getConsumerInfo(exArgs.stream, durable1); + printObject(ci); + + // 6. Delete a consumer, then list them again // Subsequent calls to deleteStream will throw a // JetStreamApiException [10014] - System.out.println("\n----------\n3. Delete consumers"); + System.out.println("\n----------\n6. Delete consumers"); jsm.deleteConsumer(exArgs.stream, durable1); consumerNames = jsm.getConsumerNames(exArgs.stream); printObject(consumerNames); - // 5. Try to delete the consumer again and get the exception - System.out.println("\n----------\n5. Delete consumer again"); + // 7. Try to delete the consumer again and get the exception + System.out.println("\n----------\n7. Delete consumer again"); try { jsm.deleteConsumer(exArgs.stream, durable1); @@ -101,6 +118,17 @@ public static void main(String[] args) { System.out.println("Exception was: '" + e.getMessage() + "'"); } + // 8. Try to pause a consumer that does not exist, and you will get an exception + System.out.println("\n----------\n8. Pause non-existent consumer ."); + try + { + jsm.pauseConsumer(exArgs.stream, durable1, ZonedDateTime.now()); + } + catch (JetStreamApiException e) + { + System.out.println("Exception was: '" + e.getMessage() + "'"); + } + System.out.println("\n----------"); // delete the stream since we are done with it. diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index 82c2ef922..623e8b341 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -15,6 +15,7 @@ import io.nats.client.api.*; import java.io.IOException; +import java.time.ZonedDateTime; import java.util.List; /** @@ -131,6 +132,29 @@ public interface JetStreamManagement { */ boolean deleteConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException; + /** + * Pauses a consumer. + * @param streamName name of the stream + * @param consumerName the name of the consumer. + * @param pauseUntil consumer is paused until this time. + * @return ConsumerPauseResponse the pause response + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data, for instance the consumer does not exist. + */ + ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException; + + /** + * Resumes a paused consumer. + * @param streamName name of the stream + * @param consumerName the name of the consumer. + * @return true if the resume succeeded + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data, for instance the consumer does not exist. + */ + boolean resumeConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException; + /** * Gets the info for an existing consumer. * @param streamName name of the stream diff --git a/src/main/java/io/nats/client/api/ConsumerConfiguration.java b/src/main/java/io/nats/client/api/ConsumerConfiguration.java index 0eab945fa..591b9b483 100644 --- a/src/main/java/io/nats/client/api/ConsumerConfiguration.java +++ b/src/main/java/io/nats/client/api/ConsumerConfiguration.java @@ -80,6 +80,7 @@ public class ConsumerConfiguration implements JsonSerializable { protected final Integer maxBatch; protected final Integer maxBytes; protected final Integer numReplicas; + protected final ZonedDateTime pauseUntil; protected final Boolean flowControl; protected final Boolean headersOnly; protected final Boolean memStorage; @@ -110,6 +111,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) { this.maxBatch = cc.maxBatch; this.maxBytes = cc.maxBytes; this.numReplicas = cc.numReplicas; + this.pauseUntil = cc.pauseUntil; this.flowControl = cc.flowControl; this.headersOnly = cc.headersOnly; this.memStorage = cc.memStorage; @@ -143,6 +145,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) { maxBatch = readInteger(v, MAX_BATCH); maxBytes = readInteger(v, MAX_BYTES); numReplicas = readInteger(v, NUM_REPLICAS); + pauseUntil = readDate(v, PAUSE_UNTIL); flowControl = readBoolean(v, FLOW_CONTROL, null); headersOnly = readBoolean(v, HEADERS_ONLY, null); @@ -187,6 +190,7 @@ protected ConsumerConfiguration(Builder b) this.maxBatch = b.maxBatch; this.maxBytes = b.maxBytes; this.numReplicas = b.numReplicas; + this.pauseUntil = b.pauseUntil; this.flowControl = b.flowControl; this.headersOnly = b.headersOnly; @@ -229,6 +233,7 @@ public String toJson() { JsonUtils.addFieldAsNanos(sb, INACTIVE_THRESHOLD, inactiveThreshold); JsonUtils.addDurations(sb, BACKOFF, backoff); JsonUtils.addField(sb, NUM_REPLICAS, numReplicas); + JsonUtils.addField(sb, PAUSE_UNTIL, pauseUntil); JsonUtils.addField(sb, MEM_STORAGE, memStorage); JsonUtils.addField(sb, METADATA, metadata); if (filterSubjects != null) { @@ -485,6 +490,14 @@ public Map getMetadata() { */ public int getNumReplicas() { return getOrUnset(numReplicas); } + /** + * Get the time until the consumer is paused. + * @return paused until time + */ + public ZonedDateTime getPauseUntil() { + return pauseUntil; + } + /** * Gets whether deliver policy of this consumer configuration was set or left unset * @return true if the policy was set, false if the policy was not set @@ -663,6 +676,7 @@ public static class Builder { private Integer maxBatch; private Integer maxBytes; private Integer numReplicas; + private ZonedDateTime pauseUntil; private Boolean flowControl; private Boolean headersOnly; @@ -1142,6 +1156,16 @@ public Builder numReplicas(Integer numReplicas) { return this; } + /** + * Sets the time to pause the consumer until. + * @param pauseUntil the time to pause + * @return Builder + */ + public Builder pauseUntil(ZonedDateTime pauseUntil) { + this.pauseUntil = pauseUntil; + return this; + } + /** * set the headers only flag saying to deliver only the headers of * messages in the stream and not the bodies diff --git a/src/main/java/io/nats/client/api/ConsumerInfo.java b/src/main/java/io/nats/client/api/ConsumerInfo.java index 1a1276f69..3b642b832 100644 --- a/src/main/java/io/nats/client/api/ConsumerInfo.java +++ b/src/main/java/io/nats/client/api/ConsumerInfo.java @@ -16,6 +16,7 @@ import io.nats.client.Message; import io.nats.client.support.JsonValue; +import java.time.Duration; import java.time.ZonedDateTime; import static io.nats.client.support.ApiConstants.*; @@ -36,6 +37,8 @@ public class ConsumerInfo extends ApiResponse { private final long numWaiting; private final long numAckPending; private final long numRedelivered; + private final boolean paused; + private final Duration pauseRemaining; private final ClusterInfo clusterInfo; private final boolean pushBound; private final ZonedDateTime timestamp; @@ -59,6 +62,8 @@ public ConsumerInfo(JsonValue vConsumerInfo) { numRedelivered = readLong(jv, NUM_REDELIVERED, 0); numPending = readLong(jv, NUM_PENDING, 0); numWaiting = readLong(jv, NUM_WAITING, 0); + paused = readBoolean(jv, PAUSED, false); + pauseRemaining = readNanos(jv, PAUSE_REMAINING); clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER)); pushBound = readBoolean(jv, PUSH_BOUND); @@ -110,6 +115,14 @@ public long getRedelivered() { return numRedelivered; } + public boolean getPaused() { + return paused; + } + + public Duration getPauseRemaining() { + return pauseRemaining; + } + public ClusterInfo getClusterInfo() { return clusterInfo; } diff --git a/src/main/java/io/nats/client/api/ConsumerPauseRequest.java b/src/main/java/io/nats/client/api/ConsumerPauseRequest.java new file mode 100644 index 000000000..8badc6cbb --- /dev/null +++ b/src/main/java/io/nats/client/api/ConsumerPauseRequest.java @@ -0,0 +1,45 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.api; + +import io.nats.client.support.JsonSerializable; +import io.nats.client.support.JsonUtils; +import java.time.ZonedDateTime; + +import static io.nats.client.support.ApiConstants.CONFIG; +import static io.nats.client.support.ApiConstants.PAUSE_UNTIL; +import static io.nats.client.support.ApiConstants.STREAM_NAME; +import static io.nats.client.support.JsonUtils.addField; +import static io.nats.client.support.JsonUtils.beginJson; +import static io.nats.client.support.JsonUtils.endJson; + +/** + * Object used to make a request to pause a consumer. Used Internally + */ +public class ConsumerPauseRequest implements JsonSerializable { + private final ZonedDateTime pauseUntil; + + public ConsumerPauseRequest(ZonedDateTime pauseUntil) { + this.pauseUntil = pauseUntil; + } + + @Override + public String toJson() { + StringBuilder sb = beginJson(); + + addField(sb, PAUSE_UNTIL, pauseUntil); + + return endJson(sb).toString(); + } +} diff --git a/src/main/java/io/nats/client/api/ConsumerPauseResponse.java b/src/main/java/io/nats/client/api/ConsumerPauseResponse.java new file mode 100644 index 000000000..ad3cb9b57 --- /dev/null +++ b/src/main/java/io/nats/client/api/ConsumerPauseResponse.java @@ -0,0 +1,64 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.api; + +import io.nats.client.Message; +import java.time.Duration; +import java.time.ZonedDateTime; + +import static io.nats.client.support.ApiConstants.PAUSED; +import static io.nats.client.support.ApiConstants.PAUSE_REMAINING; +import static io.nats.client.support.ApiConstants.PAUSE_UNTIL; +import static io.nats.client.support.JsonValueUtils.readBoolean; +import static io.nats.client.support.JsonValueUtils.readDate; +import static io.nats.client.support.JsonValueUtils.readLong; +import static io.nats.client.support.JsonValueUtils.readNanos; + +public class ConsumerPauseResponse extends ApiResponse { + + private final boolean paused; + private final ZonedDateTime pauseUntil; + private final Duration pauseRemaining; + + public ConsumerPauseResponse(Message msg) { + super(msg); + paused = readBoolean(jv, PAUSED); + pauseUntil = readDate(jv, PAUSE_UNTIL); + pauseRemaining = readNanos(jv, PAUSE_REMAINING); + } + + /** + * Returns true if the consumer was paused + * @return whether the consumer is paused + */ + public boolean isPaused() { + return paused; + } + + /** + * Returns the time until the consumer is paused + * @return pause until time + */ + public ZonedDateTime getPauseUntil() { + return pauseUntil; + } + + /** + * Returns how much time is remaining for this consumer to be paused + * @return remaining paused time + */ + public Duration getPauseRemaining() { + return pauseRemaining; + } +} diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index ad152d1d7..27660ccc2 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -516,6 +516,7 @@ public List getChanges(ConsumerConfiguration serverCc) { if (maxBatch != null && maxBatch != serverCcc.getMaxBatch()) { changes.add("maxBatch"); } if (maxBytes != null && maxBytes != serverCcc.getMaxBytes()) { changes.add("maxBytes"); } if (numReplicas != null && !numReplicas.equals(serverCcc.numReplicas)) { changes.add("numReplicas"); } + if (pauseUntil != null && !pauseUntil.equals(serverCcc.pauseUntil)) { changes.add("pauseUntil"); } if (ackWait != null && !ackWait.equals(getOrUnset(serverCcc.ackWait))) { changes.add("ackWait"); } if (idleHeartbeat != null && !idleHeartbeat.equals(getOrUnset(serverCcc.idleHeartbeat))) { changes.add("idleHeartbeat"); } diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index 524607c58..07da98067 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.ZonedDateTime; import java.util.List; import static io.nats.client.support.Validator.*; @@ -142,6 +143,32 @@ public boolean deleteConsumer(String streamName, String consumerName) throws IOE return new SuccessApiResponse(resp).throwOnHasError().getSuccess(); } + /** + * {@inheritDoc} + */ + @Override + public ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException { + validateNotNull(streamName, "Stream Name"); + validateNotNull(consumerName, "Consumer Name"); + String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName); + ConsumerPauseRequest pauseRequest = new ConsumerPauseRequest(pauseUntil); + Message resp = makeRequestResponseRequired(subj, pauseRequest.serialize(), jso.getRequestTimeout()); + return new ConsumerPauseResponse(resp).throwOnHasError(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean resumeConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException { + validateNotNull(streamName, "Stream Name"); + validateNotNull(consumerName, "Consumer Name"); + String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName); + Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout()); + ConsumerPauseResponse response = new ConsumerPauseResponse(resp).throwOnHasError(); + return !response.isPaused(); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index f40c70fbb..8a685af6f 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -145,6 +145,9 @@ public interface ApiConstants { String OPT_START_SEQ = "opt_start_seq"; String OPT_START_TIME = "opt_start_time"; String OPTIONS = "options"; + String PAUSED = "paused"; + String PAUSE_REMAINING = "pause_remaining"; + String PAUSE_UNTIL = "pause_until"; String PLACEMENT = "placement"; String PORT = "port"; String PROCESSING_TIME = "processing_time"; diff --git a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java index bcf04d285..45aed12b2 100644 --- a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java +++ b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java @@ -38,6 +38,9 @@ public interface NatsJetStreamConstants { // JSAPI_CONSUMER_DELETE is used to delete consumers. String JSAPI_CONSUMER_DELETE = "CONSUMER.DELETE.%s.%s"; + // JSAPI_CONSUMER_PAUSE is used to pause/resume consumers. + String JSAPI_CONSUMER_PAUSE = "CONSUMER.PAUSE.%s.%s"; + // JSAPI_CONSUMER_NAMES is used to return a list of consumer names String JSAPI_CONSUMER_NAMES = "CONSUMER.NAMES.%s"; diff --git a/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java b/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java index 55b175a67..5ba55a8af 100644 --- a/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java +++ b/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java @@ -59,6 +59,7 @@ public void testBuilder() { .maxBytes(56) .maxExpires(77000) // duration .numReplicas(5) + .pauseUntil(zdt) .inactiveThreshold(88000) // duration .headersOnly(true) .memStorage(true) @@ -234,6 +235,7 @@ private void assertAsBuilt(ConsumerConfiguration c, ZonedDateTime zdt) { assertEquals(Duration.ofSeconds(77), c.getMaxExpires()); assertEquals(Duration.ofSeconds(88), c.getInactiveThreshold()); assertEquals(5, c.getNumReplicas()); + assertEquals(zdt, c.getPauseUntil()); assertTrue(c.isHeadersOnly()); assertTrue(c.isMemStorage()); assertTrue(c.deliverPolicyWasSet()); diff --git a/src/test/java/io/nats/client/api/ConsumerInfoTests.java b/src/test/java/io/nats/client/api/ConsumerInfoTests.java index 76a9f27b8..9c7d98a47 100644 --- a/src/test/java/io/nats/client/api/ConsumerInfoTests.java +++ b/src/test/java/io/nats/client/api/ConsumerInfoTests.java @@ -60,6 +60,8 @@ public void testConsumerInfo() { assertEquals(24, ci.getNumPending()); assertEquals(42, ci.getNumAckPending()); assertEquals(42, ci.getRedelivered()); + assertTrue(ci.getPaused()); + assertEquals(Duration.ofSeconds(20), ci.getPauseRemaining()); ConsumerConfiguration c = ci.getConsumerConfiguration(); assertEquals("foo-consumer", c.getDurable()); @@ -69,6 +71,7 @@ public void testConsumerInfo() { assertEquals(Duration.ofSeconds(30), c.getAckWait()); assertEquals(10, c.getMaxDeliver()); assertEquals(ReplayPolicy.Original, c.getReplayPolicy()); + assertEquals(DateTimeUtils.parseDateTime("2024-03-02T10:43:32.062847087Z"), c.getPauseUntil()); ClusterInfo clusterInfo = ci.getClusterInfo(); assertNotNull(clusterInfo); diff --git a/src/test/java/io/nats/client/api/ResponseTests.java b/src/test/java/io/nats/client/api/ResponseTests.java index e64c34456..4429ea313 100644 --- a/src/test/java/io/nats/client/api/ResponseTests.java +++ b/src/test/java/io/nats/client/api/ResponseTests.java @@ -14,6 +14,8 @@ package io.nats.client.api; import io.nats.client.impl.JetStreamTestBase; +import io.nats.client.support.DateTimeUtils; +import java.time.Duration; import org.junit.jupiter.api.Test; import static io.nats.client.utils.ResourceUtils.dataAsString; @@ -31,4 +33,24 @@ public void testPurgeResponse() { assertEquals(5, pr.getPurgedCount()); // coverage for deprecated assertNotNull(pr.toString()); // COVERAGE } + + @Test + public void testPauseResponse() { + String json = dataAsString("ConsumerPauseResponse.json"); + ConsumerPauseResponse pr = new ConsumerPauseResponse(getDataMessage(json)); + assertTrue(pr.isPaused()); + assertEquals(DateTimeUtils.parseDateTime("2024-03-02T13:21:45.198423724Z"), pr.getPauseUntil()); + assertEquals(Duration.ofSeconds(30), pr.getPauseRemaining()); + assertNotNull(pr.toString()); // COVERAGE + } + + @Test + public void testPauseResumeResponse() { + String json = dataAsString("ConsumerResumeResponse.json"); + ConsumerPauseResponse pr = new ConsumerPauseResponse(getDataMessage(json)); + assertFalse(pr.isPaused()); + assertEquals(DateTimeUtils.parseDateTime("0001-01-01T00:00:00Z"), pr.getPauseUntil()); + assertNull(pr.getPauseRemaining()); + assertNotNull(pr.toString()); // COVERAGE + } } diff --git a/src/test/java/io/nats/client/impl/ConsumerConfigurationComparerTests.java b/src/test/java/io/nats/client/impl/ConsumerConfigurationComparerTests.java index 65c460326..03f3905dd 100644 --- a/src/test/java/io/nats/client/impl/ConsumerConfigurationComparerTests.java +++ b/src/test/java/io/nats/client/impl/ConsumerConfigurationComparerTests.java @@ -131,6 +131,9 @@ public void testChangeFieldsIdentified() { assertNotChange(builder(orig).numReplicas(null).build(), orig); assertChange(builder(orig).numReplicas(1).build(), orig, "numReplicas"); + assertNotChange(builder(orig).pauseUntil(null).build(), orig); + assertChange(builder(orig).pauseUntil(ZonedDateTime.now()).build(), orig, "pauseUntil"); + assertNotChange(builder(orig).filterSubject(EMPTY).build(), orig); ccTest = builder(orig).filterSubject(PLAIN).build(); assertNotChange(ccTest, ccTest); diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 279ebe4aa..419f39135 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; +import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME; +import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT; import static io.nats.client.support.NatsJetStreamConstants.*; import static io.nats.client.utils.ResourceUtils.dataAsString; import static org.junit.jupiter.api.Assertions.*; @@ -751,6 +753,83 @@ public void testAddDeleteConsumer() throws Exception { }); } + @Test + public void testAddPausedConsumer() throws Exception { + jsServer.run(TestBase::atLeast2_11, nc -> { + JetStreamManagement jsm = nc.jetStreamManagement(); + TestingStreamContainer tsc = new TestingStreamContainer(jsm); + + List list = jsm.getConsumers(tsc.stream); + assertEquals(0, list.size()); + + ZonedDateTime pauseUntil = ZonedDateTime.now(ZONE_ID_GMT).plusMinutes(2); + ConsumerConfiguration cc = ConsumerConfiguration.builder() + .durable(tsc.name()) + .pauseUntil(pauseUntil) + .build(); + + // Consumer should be paused on creation. + ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc); + assertTrue(ci.getPaused()); + assertTrue(ci.getPauseRemaining().toMillis() > 60_000); + assertEquals(pauseUntil, ci.getConsumerConfiguration().getPauseUntil()); + }); + } + + @Test + public void testPauseResumeConsumer() throws Exception { + jsServer.run(TestBase::atLeast2_11, nc -> { + JetStreamManagement jsm = nc.jetStreamManagement(); + TestingStreamContainer tsc = new TestingStreamContainer(jsm); + + List list = jsm.getConsumers(tsc.stream); + assertEquals(0, list.size()); + + ConsumerConfiguration cc = ConsumerConfiguration.builder() + .durable(tsc.name()) + .build(); + + // durable and name can both be null + ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc); + assertNotNull(ci.getName()); + + // pause consumer + ZonedDateTime pauseUntil = ZonedDateTime.now(ZONE_ID_GMT).plusMinutes(2); + ConsumerPauseResponse pauseResponse = jsm.pauseConsumer(tsc.stream, ci.getName(), pauseUntil); + assertTrue(pauseResponse.isPaused()); + assertEquals(pauseUntil, pauseResponse.getPauseUntil()); + + ci = jsm.getConsumerInfo(tsc.stream, ci.getName()); + assertTrue(ci.getPaused()); + assertTrue(ci.getPauseRemaining().toMillis() > 60_000); + + // resume consumer + boolean isResumed = jsm.resumeConsumer(tsc.stream, ci.getName()); + assertTrue(isResumed); + + ci = jsm.getConsumerInfo(tsc.stream, ci.getName()); + assertFalse(ci.getPaused()); + + // pause again + pauseResponse = jsm.pauseConsumer(tsc.stream, ci.getName(), pauseUntil); + assertTrue(pauseResponse.isPaused()); + assertEquals(pauseUntil, pauseResponse.getPauseUntil()); + + // resume via pause with no date + pauseResponse = jsm.pauseConsumer(tsc.stream, ci.getName(), null); + assertFalse(pauseResponse.isPaused()); + assertEquals(DEFAULT_TIME, pauseResponse.getPauseUntil()); + + ci = jsm.getConsumerInfo(tsc.stream, ci.getName()); + assertFalse(ci.getPaused()); + + assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(stream(), tsc.name(), pauseUntil)); + assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(tsc.stream, name(), pauseUntil)); + assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(stream(), tsc.name())); + assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(tsc.stream, name())); + }); + } + private static void addConsumer(JetStreamManagement jsm, boolean atLeast2dot9, int id, boolean deliver, String fs, ConsumerConfiguration cc) throws IOException, JetStreamApiException { ConsumerInfo ci = jsm.addOrUpdateConsumer(STREAM, cc); assertEquals(durable(id), ci.getName()); diff --git a/src/test/java/io/nats/client/utils/TestBase.java b/src/test/java/io/nats/client/utils/TestBase.java index d8974d890..f4d248262 100644 --- a/src/test/java/io/nats/client/utils/TestBase.java +++ b/src/test/java/io/nats/client/utils/TestBase.java @@ -125,6 +125,10 @@ public static boolean atLeast2_10_3(ServerInfo si) { return si.isSameOrNewerThanVersion("2.10.3"); } + public static boolean atLeast2_11(ServerInfo si) { + return si.isNewerVersionThan("2.10.99"); + } + public static void runInServer(InServerTest inServerTest) throws Exception { runInServer(false, false, null, null, inServerTest); } diff --git a/src/test/resources/data/ConsumerInfo.json b/src/test/resources/data/ConsumerInfo.json index 0abc6702e..81f036af5 100644 --- a/src/test/resources/data/ConsumerInfo.json +++ b/src/test/resources/data/ConsumerInfo.json @@ -31,7 +31,8 @@ "inactive_threshold": 50000000000, "backoff": [1000000000, 2000000000, 3000000000], "num_replicas": 5, - "mem_storage": true + "mem_storage": true, + "pause_until": "2024-03-02T10:43:32.062847087Z" }, "delivered": { "consumer_seq": 1, @@ -46,6 +47,8 @@ "num_pending": 24, "num_ack_pending": 42, "num_redelivered": 42, + "paused": true, + "pause_remaining": 20000000000, "cluster": { "name": "clustername", "leader": "clusterleader", diff --git a/src/test/resources/data/ConsumerPauseResponse.json b/src/test/resources/data/ConsumerPauseResponse.json new file mode 100644 index 000000000..c21b34d2a --- /dev/null +++ b/src/test/resources/data/ConsumerPauseResponse.json @@ -0,0 +1,6 @@ +{ + "type": "io.nats.jetstream.api.v1.consumer_pause_response", + "paused": true, + "pause_until": "2024-03-02T13:21:45.198423724Z", + "pause_remaining": 30000000000 +} diff --git a/src/test/resources/data/ConsumerResumeResponse.json b/src/test/resources/data/ConsumerResumeResponse.json new file mode 100644 index 000000000..4ceb54232 --- /dev/null +++ b/src/test/resources/data/ConsumerResumeResponse.json @@ -0,0 +1,5 @@ +{ + "type": "io.nats.jetstream.api.v1.consumer_pause_response", + "paused": false, + "pause_until": "0001-01-01T00:00:00Z" +}