Skip to content

Commit

Permalink
Merge pull request #490 from nats-io/queue-ack-js-misc
Browse files Browse the repository at this point in the history
new queue, ack format, misc js
  • Loading branch information
scottf committed Aug 31, 2021
2 parents f7fe576 + d12ea32 commit c9ca356
Show file tree
Hide file tree
Showing 20 changed files with 539 additions and 134 deletions.
2 changes: 1 addition & 1 deletion src/NATS.Client/Internals/JsonUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ internal static List<string> OptionalStringList(JSONNode node, String field)
}

internal static byte[] AsByteArrayFromBase64(JSONNode node) {
return string.IsNullOrEmpty(node.Value) ? null : Convert.FromBase64String(node.Value);
return string.IsNullOrWhiteSpace(node.Value) ? null : Convert.FromBase64String(node.Value);
}

internal static DateTime AsDate(JSONNode node)
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/Internals/SimpleJson.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ internal partial class JSONString : JSONNode

public override JSONNodeType Tag { get { return JSONNodeType.String; } }
public override bool IsString { get { return true; } }
public override bool ShouldWrite { get { return !string.IsNullOrEmpty(m_Data); } }
public override bool ShouldWrite { get { return !string.IsNullOrWhiteSpace(m_Data); } }

public override Enumerator GetEnumerator() { return new Enumerator(); }

Expand Down
24 changes: 23 additions & 1 deletion src/NATS.Client/Internals/Validator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,28 @@ internal static string ValidateDurableRequired(string durable, ConsumerConfigura
"Durable is required and cannot contain a '.', '*' or '>' [null]");
}

internal static String ValidateMustMatchIfBothSupplied(String s1, String s2, String label1, String label2) {
// s1 | s2 || result
// ---- | ---- || --------------
// null | null || valid, null s2
// null | y || valid, y s2
// x | null || valid, x s1
// x | x || valid, x s1
// x | y || invalid
s1 = EmptyAsNull(s1);
s2 = EmptyAsNull(s2);
if (s1 == null) {
return s2; // s2 can be either null or y
}

// x / null or x / x
if (s2 == null || s1.Equals(s2)) {
return s1;
}

throw new ArgumentException($"{label1} [{s1}] must match the {label2} [{s2}] if both are provided.");
}

public static string Validate(string s, bool required, string label, Func<string> check)
{
string preCheck = EmptyAsNull(s);
Expand Down Expand Up @@ -276,7 +298,7 @@ private static bool NotPrintableOrHasWildGtDollar(String s) {

internal static string EmptyAsNull(string s)
{
return string.IsNullOrEmpty(s) ? null : s;
return string.IsNullOrWhiteSpace(s) ? null : s;
}

internal static bool ZeroOrLtMinus1(long l)
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client/JetStream/ApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ internal static class ApiConstants
internal const string Current = "current";
internal const string Data = "data";
internal const string Deliver = "deliver";
internal const string DeliverGroup = "deliver_group";
internal const string DeliverPolicy = "deliver_policy";
internal const string DeliverSubject = "deliver_subject";
internal const string Delivered = "delivered";
Expand Down Expand Up @@ -95,6 +96,7 @@ internal static class ApiConstants
internal const string Port = "port";
internal const string Proto = "proto";
internal const string Purged = "purged";
internal const string PushBound = "push_bound";
internal const string RateLimitBps = "rate_limit_bps";
internal const string ReplayPolicy = "replay_policy";
internal const string Replica = "replica";
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/ApiResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal ApiResponse(string json, bool throwOnError = false)
{
JsonNode = JSON.Parse(json);
Type = JsonNode[ApiConstants.Type].Value;
if (string.IsNullOrEmpty(Type))
if (string.IsNullOrWhiteSpace(Type))
{
Type = NoType;
}
Expand Down
21 changes: 20 additions & 1 deletion src/NATS.Client/JetStream/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public sealed class ConsumerConfiguration : JsonSerializable
public string Description { get; }
public string Durable { get; }
public string DeliverSubject { get; }
public string DeliverGroup { get; }
public ulong StartSeq { get; }
public DateTime StartTime { get; }
public Duration AckWait { get; }
Expand All @@ -51,6 +52,7 @@ internal ConsumerConfiguration(JSONNode ccNode)
Description = ccNode[ApiConstants.Description].Value;
Durable = ccNode[ApiConstants.DurableName].Value;
DeliverSubject = ccNode[ApiConstants.DeliverSubject].Value;
DeliverGroup = ccNode[ApiConstants.DeliverGroup].Value;
StartSeq = ccNode[ApiConstants.OptStartSeq].AsUlong;
StartTime = JsonUtils.AsDate(ccNode[ApiConstants.OptStartTime]);
AckWait = JsonUtils.AsDuration(ccNode, ApiConstants.AckWait, DefaultAckWait);
Expand All @@ -66,7 +68,7 @@ internal ConsumerConfiguration(JSONNode ccNode)

internal ConsumerConfiguration(string description, string durable, DeliverPolicy deliverPolicy, ulong startSeq, DateTime startTime,
AckPolicy ackPolicy, Duration ackWait, long maxDeliver, string filterSubject, ReplayPolicy replayPolicy,
string sampleFrequency, long rateLimit, string deliverSubject, long maxAckPending,
string sampleFrequency, long rateLimit, string deliverSubject, string deliverGroup, long maxAckPending,
Duration idleHeartbeat, bool flowControl, long maxPullWaiting)
{
Description = description;
Expand All @@ -82,6 +84,7 @@ internal ConsumerConfiguration(string description, string durable, DeliverPolicy
SampleFrequency = sampleFrequency;
RateLimit = rateLimit;
DeliverSubject = deliverSubject;
DeliverGroup = deliverGroup;
MaxAckPending = maxAckPending;
IdleHeartbeat = idleHeartbeat;
FlowControl = flowControl;
Expand All @@ -96,6 +99,7 @@ internal override JSONNode ToJsonNode()
[ApiConstants.DurableName] = Durable,
[ApiConstants.DeliverPolicy] = DeliverPolicy.GetString(),
[ApiConstants.DeliverSubject] = DeliverSubject,
[ApiConstants.DeliverGroup] = DeliverGroup,
[ApiConstants.OptStartSeq] = StartSeq,
[ApiConstants.OptStartTime] = JsonUtils.ToString(StartTime),
[ApiConstants.AckPolicy] = AckPolicy.GetString(),
Expand Down Expand Up @@ -130,6 +134,7 @@ public sealed class ConsumerConfigurationBuilder
private string _description;
private string _durable;
private string _deliverSubject;
private string _deliverGroup;
private ulong _startSeq;
private DateTime _startTime;
private Duration _ackWait = Duration.OfSeconds(30);
Expand All @@ -144,6 +149,7 @@ public sealed class ConsumerConfigurationBuilder

public string Durable => _durable;
public string DeliverSubject => _deliverSubject;
public string DeliverGroup => _deliverGroup;
public string FilterSubject => _filterSubject;
public long MaxAckPending => _maxAckPending;
public AckPolicy AcknowledgementPolicy => _ackPolicy;
Expand All @@ -166,6 +172,7 @@ public ConsumerConfigurationBuilder(ConsumerConfiguration cc)
_sampleFrequency = cc.SampleFrequency;
_rateLimit = cc.RateLimit;
_deliverSubject = cc.DeliverSubject;
_deliverGroup = cc.DeliverGroup;
_maxAckPending = cc.MaxAckPending;
_idleHeartbeat = cc.IdleHeartbeat;
_flowControl = cc.FlowControl;
Expand Down Expand Up @@ -216,6 +223,17 @@ public ConsumerConfigurationBuilder WithDeliverSubject(string subject)
return this;
}

/// <summary>
/// Sets the group to deliver messages to.
/// </summary>
/// <param name="group">the delivery group.</param>
/// <returns>The ConsumerConfigurationBuilder</returns>
public ConsumerConfigurationBuilder WithDeliverGroup(string group)
{
_deliverGroup = group;
return this;
}

/// <summary>
/// Sets the start sequence of the ConsumerConfiguration.
/// </summary>
Expand Down Expand Up @@ -400,6 +418,7 @@ public ConsumerConfiguration Build()
_sampleFrequency,
_rateLimit,
_deliverSubject,
_deliverGroup,
_maxAckPending,
_idleHeartbeat,
_flowControl,
Expand Down
4 changes: 4 additions & 0 deletions src/NATS.Client/JetStream/ConsumerInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public sealed class ConsumerInfo : ApiResponse
public long NumWaiting { get; private set; }
public long NumAckPending { get; private set; }
public long NumRedelivered { get; private set; }
public ClusterInfo ClusterInfo { get; private set; }
public bool PushBound { get; private set; }

internal ConsumerInfo(Msg msg, bool throwOnError) : base(msg, throwOnError)
{
Expand Down Expand Up @@ -57,6 +59,8 @@ private void Init(JSONNode ciNode)
NumWaiting = ciNode[ApiConstants.NumWaiting].AsLong;
NumAckPending = ciNode[ApiConstants.NumAckPending].AsLong;
NumRedelivered = ciNode[ApiConstants.NumRedelivered].AsLong;
ClusterInfo = ClusterInfo.OptionalInstance(ciNode[ApiConstants.Cluster]);
PushBound = ciNode[ApiConstants.PushBound].AsBool;
}
}
}
68 changes: 47 additions & 21 deletions src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,27 @@ Subscription CreateSubscription(string subject, string queueName,
SubscribeOptions so;

if (isPullMode) {
so = pullOpts;
so = pullOpts; // options must have already been checked to be non null
stream = pullOpts.Stream;
ccBuilder = Builder(pullOpts.ConsumerConfiguration);
ccBuilder.WithDeliverSubject(null); // pull mode can't have a deliver subject
// queueName is already null
ccBuilder.WithDeliverGroup(null); // pull mode can't have a deliver group
}
else {
so = pushOpts == null
? PushSubscribeOptions.Builder().Build()
: pushOpts;
so = pushOpts ?? PushSubscribeOptions.Builder().Build();
stream = so.Stream; // might be null, that's ok (see direct)
ccBuilder = Builder(so.ConsumerConfiguration);
ccBuilder.WithMaxPullWaiting(0); // this does not apply to push, in fact will error b/c deliver subject will be set
// deliver subject does not have to be cleared
// figure out the queue name
queueName = Validator.ValidateMustMatchIfBothSupplied(ccBuilder.DeliverGroup, queueName,
"Consumer Configuration DeliverGroup", "Queue Name");
ccBuilder.WithDeliverGroup(queueName); // and set it in case the deliver group was null
}

//
bool direct = so.Direct;
bool bindMode = so.Bind;

string durable = ccBuilder.Durable;
string inbox = ccBuilder.DeliverSubject;
Expand All @@ -176,27 +182,47 @@ Subscription CreateSubscription(string subject, string queueName,

// 2. Is this a durable or ephemeral
if (durable != null) {
ConsumerInfo consumerInfo =
ConsumerInfo lookedUpInfo =
LookupConsumerInfo(stream, durable);

if (consumerInfo != null) { // the consumer for that durable already exists
if (lookedUpInfo != null) { // the consumer for that durable already exists
createConsumer = false;
ConsumerConfiguration cc = consumerInfo.Configuration;
ConsumerConfiguration lookedUpConfig = lookedUpInfo.Configuration;

// durable already exists, make sure the filter subject matches
string existingFilterSubject = cc.FilterSubject;
if (filterSubject != null && !filterSubject.Equals(existingFilterSubject)) {
string lookedUp = Validator.EmptyAsNull(lookedUpConfig.FilterSubject);
if (filterSubject != null && !filterSubject.Equals(lookedUp)) {
throw new ArgumentException(
$"Subject {subject} mismatches consumer configuration {filterSubject}.");
}

filterSubject = existingFilterSubject;

// use the deliver subject as the inbox. It may be null, that's ok
inbox = cc.DeliverSubject;
filterSubject = lookedUp;

lookedUp = Validator.EmptyAsNull(lookedUpConfig.DeliverGroup);
if (lookedUp == null) {
// lookedUp was null, means existing consumer is not a queue consumer
if (queueName == null) {
// ok fine, no queue requested and the existing consumer is also not a queue consumer
// we must check if the consumer is in use though
if (lookedUpInfo.PushBound) {
throw new ArgumentException($"Consumer [{durable}] is already bound to a subscription.");
}
}
else { // else they requested a queue but this durable was not configured as queue
throw new ArgumentException($"Existing consumer [{durable}] is not configured as a queue / deliver group.");
}
}
else if (queueName == null) {
throw new ArgumentException($"Existing consumer [{durable}] is configured as a queue / deliver group.");
}
else if (lookedUp != queueName) {
throw new ArgumentException(
$"Existing consumer deliver group {lookedUp} does not match requested queue / deliver group {queueName}.");
}

inbox = lookedUpConfig.DeliverSubject; // use the deliver subject as the inbox. It may be null, that's ok
}
else if (direct) {
throw new ArgumentException("Consumer not found for durable. Required in direct mode.");
else if (bindMode) {
throw new ArgumentException("Consumer not found for durable. Required in bind mode.");
}
}

Expand Down Expand Up @@ -324,7 +350,7 @@ public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, EventH
public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, string queue, EventHandler<MsgHandlerEventArgs> handler, bool autoAck)
{
Validator.ValidateSubject(subject, true);
queue = Validator.ValidateQueueName(queue, false);
queue = Validator.EmptyAsNull(Validator.ValidateQueueName(queue, false));
Validator.ValidateNotNull(handler, nameof(handler));
return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, queue, handler, autoAck, null, null);
}
Expand All @@ -339,7 +365,7 @@ public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, EventH
public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, string queue, EventHandler<MsgHandlerEventArgs> handler, bool autoAck, PushSubscribeOptions options)
{
Validator.ValidateSubject(subject, true);
queue = Validator.ValidateQueueName(queue, false);
queue = Validator.EmptyAsNull(Validator.ValidateQueueName(queue, false));
Validator.ValidateNotNull(handler, nameof(handler));
return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, queue, handler, autoAck, options, null);
}
Expand All @@ -359,14 +385,14 @@ public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, PushSubs
public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, string queue)
{
Validator.ValidateSubject(subject, true);
queue = Validator.ValidateQueueName(queue, false);
queue = Validator.EmptyAsNull(Validator.ValidateQueueName(queue, false));
return (IJetStreamPushSyncSubscription) CreateSubscription(subject, queue, null, false, null, null);
}

public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, string queue, PushSubscribeOptions options)
{
Validator.ValidateSubject(subject, true);
queue = Validator.ValidateQueueName(queue, false);
queue = Validator.EmptyAsNull(Validator.ValidateQueueName(queue, false));
return (IJetStreamPushSyncSubscription) CreateSubscription(subject, queue, null, false, options, null);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/JetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private StreamInfo AddOrUpdateStream(StreamConfiguration config, string addUpdat
{
Validator.ValidateNotNull(config, nameof(config));

if (string.IsNullOrEmpty(config.Name)) {
if (string.IsNullOrWhiteSpace(config.Name)) {
throw new ArgumentException("Configuration must have a valid stream name");
}

Expand Down
Loading

0 comments on commit c9ca356

Please sign in to comment.