Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Consumers Pause #1093

Merged
merged 12 commits into from
Mar 7, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import io.nats.client.Nats;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.ConsumerPauseResponse;
import io.nats.examples.ExampleArgs;
import io.nats.examples.ExampleUtils;

import java.time.ZonedDateTime;
import java.util.List;

import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT;
import static io.nats.examples.jetstream.NatsJsUtils.*;

/**
Expand Down Expand Up @@ -82,16 +85,30 @@ public static void main(String[] args) {
List<String> consumerNames = jsm.getConsumerNames(exArgs.stream);
printObject(consumerNames);

// 4. Delete a consumer, then list them again
// 4. Pause a consumer
System.out.println("\n----------\n4. pauseConsumer");
ZonedDateTime pauseUntil = ZonedDateTime.now(ZONE_ID_GMT).plusSeconds(30);
ConsumerPauseResponse pauseResponse = jsm.pauseConsumer(exArgs.stream, durable1, pauseUntil);
printObject(pauseResponse);
ci = jsm.getConsumerInfo(exArgs.stream, durable1);
printObject(ci);

// 5. Resume a (paused) consumer
System.out.println("\n----------\n5. resumeConsumer");
jsm.resumeConsumer(exArgs.stream, durable1);
ci = jsm.getConsumerInfo(exArgs.stream, durable1);
printObject(ci);

// 6. Delete a consumer, then list them again
// Subsequent calls to deleteStream will throw a
// JetStreamApiException [10014]
System.out.println("\n----------\n3. Delete consumers");
System.out.println("\n----------\n6. Delete consumers");
jsm.deleteConsumer(exArgs.stream, durable1);
consumerNames = jsm.getConsumerNames(exArgs.stream);
printObject(consumerNames);

// 5. Try to delete the consumer again and get the exception
System.out.println("\n----------\n5. Delete consumer again");
// 7. Try to delete the consumer again and get the exception
System.out.println("\n----------\n7. Delete consumer again");
try
{
jsm.deleteConsumer(exArgs.stream, durable1);
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.nats.client.api.*;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;

/**
Expand Down Expand Up @@ -131,6 +132,29 @@ public interface JetStreamManagement {
*/
boolean deleteConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException;

/**
* Pauses a consumer.
* @param streamName name of the stream
* @param consumerName the name of the consumer.
* @param pauseUntil consumer is paused until this time.
* @return PauseResponse the pause response
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data, for instance the consumer does not exist.
*/
ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be two api here, one that does not take pauseUntil, since it is not required.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean having three methods in the end:

  • pauseConsumer(streamName, consumerName, pauseUntil)
  • pauseConsumer(streamName, consumerName)
  • resumeConsumer(streamName, consumerName)

When not sending a pauseUntil, the consumer does not get paused and should be equivalent to calling resumeConsumer instead. So pauseConsumer is used to pause a consumer until the set date, and resumeConsumer resumes it. Could you explain why you'd expect two api here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I thought no pauseUntil meant pause until resume. So just ignore this comment.


/**
* Resumes a paused consumer.
* @param streamName name of the stream
* @param consumerName the name of the consumer.
* @return true if the resume succeeded
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data, for instance the consumer does not exist.
*/
boolean resumeConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException;

/**
* Gets the info for an existing consumer.
* @param streamName name of the stream
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class ConsumerConfiguration implements JsonSerializable {
protected final Integer maxBatch;
protected final Integer maxBytes;
protected final Integer numReplicas;
protected final ZonedDateTime pauseUntil;
protected final Boolean flowControl;
protected final Boolean headersOnly;
protected final Boolean memStorage;
Expand Down Expand Up @@ -110,6 +111,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.maxBatch = cc.maxBatch;
this.maxBytes = cc.maxBytes;
this.numReplicas = cc.numReplicas;
this.pauseUntil = cc.pauseUntil;
this.flowControl = cc.flowControl;
this.headersOnly = cc.headersOnly;
this.memStorage = cc.memStorage;
Expand Down Expand Up @@ -143,6 +145,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
maxBatch = readInteger(v, MAX_BATCH);
maxBytes = readInteger(v, MAX_BYTES);
numReplicas = readInteger(v, NUM_REPLICAS);
pauseUntil = readDate(v, PAUSE_UNTIL);

flowControl = readBoolean(v, FLOW_CONTROL, null);
headersOnly = readBoolean(v, HEADERS_ONLY, null);
Expand Down Expand Up @@ -187,6 +190,7 @@ protected ConsumerConfiguration(Builder b)
this.maxBatch = b.maxBatch;
this.maxBytes = b.maxBytes;
this.numReplicas = b.numReplicas;
this.pauseUntil = b.pauseUntil;

this.flowControl = b.flowControl;
this.headersOnly = b.headersOnly;
Expand Down Expand Up @@ -229,6 +233,7 @@ public String toJson() {
JsonUtils.addFieldAsNanos(sb, INACTIVE_THRESHOLD, inactiveThreshold);
JsonUtils.addDurations(sb, BACKOFF, backoff);
JsonUtils.addField(sb, NUM_REPLICAS, numReplicas);
JsonUtils.addField(sb, PAUSE_UNTIL, pauseUntil);
JsonUtils.addField(sb, MEM_STORAGE, memStorage);
JsonUtils.addField(sb, METADATA, metadata);
if (filterSubjects != null) {
Expand Down Expand Up @@ -485,6 +490,14 @@ public Map<String, String> getMetadata() {
*/
public int getNumReplicas() { return getOrUnset(numReplicas); }

/**
* Get the time until the consumer is paused.
* @return paused until time
*/
public ZonedDateTime getPauseUntil() {
return pauseUntil;
}

/**
* Gets whether deliver policy of this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
Expand Down Expand Up @@ -663,6 +676,7 @@ public static class Builder {
private Integer maxBatch;
private Integer maxBytes;
private Integer numReplicas;
private ZonedDateTime pauseUntil;

private Boolean flowControl;
private Boolean headersOnly;
Expand Down Expand Up @@ -1142,6 +1156,16 @@ public Builder numReplicas(Integer numReplicas) {
return this;
}

/**
* Sets the time to pause the consumer until.
* @param pauseUntil the time to pause
* @return Builder
*/
public Builder pauseUntil(ZonedDateTime pauseUntil) {
this.pauseUntil = pauseUntil;
return this;
}

/**
* set the headers only flag saying to deliver only the headers of
* messages in the stream and not the bodies
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.nats.client.Message;
import io.nats.client.support.JsonValue;

import java.time.Duration;
import java.time.ZonedDateTime;

import static io.nats.client.support.ApiConstants.*;
Expand All @@ -36,6 +37,8 @@ public class ConsumerInfo extends ApiResponse<ConsumerInfo> {
private final long numWaiting;
private final long numAckPending;
private final long numRedelivered;
private final boolean paused;
private final Duration pauseRemaining;
private final ClusterInfo clusterInfo;
private final boolean pushBound;
private final ZonedDateTime timestamp;
Expand All @@ -59,6 +62,8 @@ public ConsumerInfo(JsonValue vConsumerInfo) {
numRedelivered = readLong(jv, NUM_REDELIVERED, 0);
numPending = readLong(jv, NUM_PENDING, 0);
numWaiting = readLong(jv, NUM_WAITING, 0);
paused = readBoolean(jv, PAUSED, false);
pauseRemaining = readNanos(jv, PAUSE_REMAINING);

clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER));
pushBound = readBoolean(jv, PUSH_BOUND);
Expand Down Expand Up @@ -110,6 +115,14 @@ public long getRedelivered() {
return numRedelivered;
}

public boolean getPaused() {
return paused;
}

public Duration getPauseRemaining() {
return pauseRemaining;
}

public ClusterInfo getClusterInfo() {
return clusterInfo;
}
Expand Down
52 changes: 52 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerPauseRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonUtils;
import java.time.ZonedDateTime;

import static io.nats.client.support.ApiConstants.CONFIG;
import static io.nats.client.support.ApiConstants.PAUSE_UNTIL;
import static io.nats.client.support.ApiConstants.STREAM_NAME;
import static io.nats.client.support.JsonUtils.addField;
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;

/**
* Object used to make a request to pause a consumer. Used Internally
*/
public class ConsumerPauseRequest implements JsonSerializable {
private final ZonedDateTime pauseUntil;

public ConsumerPauseRequest(ZonedDateTime pauseUntil) {
this.pauseUntil = pauseUntil;
}

@Override
public String toJson() {
StringBuilder sb = beginJson();

addField(sb, PAUSE_UNTIL, pauseUntil);

return endJson(sb).toString();
}

@Override
public String toString() {
return "ConsumerPauseRequest{" +
"pauseUntil='" + pauseUntil + "'" +
'}';
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for this, otherwise coverage goes down. Usually I just assign it and do something like contains("true"). Or just remove it since someone could call toJson

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed it

64 changes: 64 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerPauseResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import io.nats.client.Message;
import java.time.Duration;
import java.time.ZonedDateTime;

import static io.nats.client.support.ApiConstants.PAUSED;
import static io.nats.client.support.ApiConstants.PAUSE_REMAINING;
import static io.nats.client.support.ApiConstants.PAUSE_UNTIL;
import static io.nats.client.support.JsonValueUtils.readBoolean;
import static io.nats.client.support.JsonValueUtils.readDate;
import static io.nats.client.support.JsonValueUtils.readLong;
import static io.nats.client.support.JsonValueUtils.readNanos;

public class ConsumerPauseResponse extends ApiResponse<ConsumerPauseResponse> {

private final boolean paused;
private final ZonedDateTime pauseUntil;
private final Duration pauseRemaining;

public ConsumerPauseResponse(Message msg) {
super(msg);
paused = readBoolean(jv, PAUSED);
pauseUntil = readDate(jv, PAUSE_UNTIL);
pauseRemaining = readNanos(jv, PAUSE_REMAINING);
}

/**
* Returns true if the consumer was paused
* @return whether the consumer is paused
*/
public boolean isPaused() {
return paused;
}

/**
* Returns the time until the consumer is paused
* @return pause until time
*/
public ZonedDateTime getPauseUntil() {
return pauseUntil;
}

/**
* Returns how much time is remaining for this consumer to be paused
* @return remaining paused time
*/
public Duration getPauseRemaining() {
return pauseRemaining;
}
}
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ public List<String> getChanges(ConsumerConfiguration serverCc) {
if (maxBatch != null && maxBatch != serverCcc.getMaxBatch()) { changes.add("maxBatch"); }
if (maxBytes != null && maxBytes != serverCcc.getMaxBytes()) { changes.add("maxBytes"); }
if (numReplicas != null && !numReplicas.equals(serverCcc.numReplicas)) { changes.add("numReplicas"); }
if (pauseUntil != null && !pauseUntil.equals(serverCcc.pauseUntil)) { changes.add("pauseUntil"); }

if (ackWait != null && !ackWait.equals(getOrUnset(serverCcc.ackWait))) { changes.add("ackWait"); }
if (idleHeartbeat != null && !idleHeartbeat.equals(getOrUnset(serverCcc.idleHeartbeat))) { changes.add("idleHeartbeat"); }
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.util.List;

import static io.nats.client.support.Validator.*;
Expand Down Expand Up @@ -142,6 +143,32 @@ public boolean deleteConsumer(String streamName, String consumerName) throws IOE
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
}

/**
* {@inheritDoc}
*/
@Override
public ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException {
validateNotNull(streamName, "Stream Name");
validateNotNull(consumerName, "Consumer Name");
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
ConsumerPauseRequest pauseRequest = new ConsumerPauseRequest(pauseUntil);
Message resp = makeRequestResponseRequired(subj, pauseRequest.serialize(), jso.getRequestTimeout());
return new ConsumerPauseResponse(resp).throwOnHasError();
}

/**
* {@inheritDoc}
*/
@Override
public boolean resumeConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException {
validateNotNull(streamName, "Stream Name");
validateNotNull(consumerName, "Consumer Name");
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
ConsumerPauseResponse response = new ConsumerPauseResponse(resp).throwOnHasError();
return !response.isPaused();
}

/**
* {@inheritDoc}
*/
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ public interface ApiConstants {
String OPT_START_SEQ = "opt_start_seq";
String OPT_START_TIME = "opt_start_time";
String OPTIONS = "options";
String PAUSED = "paused";
String PAUSE_REMAINING = "pause_remaining";
String PAUSE_UNTIL = "pause_until";
String PLACEMENT = "placement";
String PORT = "port";
String PROCESSING_TIME = "processing_time";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public interface NatsJetStreamConstants {
// JSAPI_CONSUMER_DELETE is used to delete consumers.
String JSAPI_CONSUMER_DELETE = "CONSUMER.DELETE.%s.%s";

// JSAPI_CONSUMER_PAUSE is used to pause/resume consumers.
String JSAPI_CONSUMER_PAUSE = "CONSUMER.PAUSE.%s.%s";

// JSAPI_CONSUMER_NAMES is used to return a list of consumer names
String JSAPI_CONSUMER_NAMES = "CONSUMER.NAMES.%s";

Expand Down
Loading
Loading