From 09000f518dc807c87de5e58c75709094b617d6de Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 30 Aug 2021 08:50:55 -0400 Subject: [PATCH 1/4] new queue, ack format, correct empty detection --- src/NATS.Client/Internals/JsonUtils.cs | 2 +- src/NATS.Client/Internals/SimpleJson.cs | 2 +- src/NATS.Client/Internals/Validator.cs | 28 ++- src/NATS.Client/JetStream/ApiConstants.cs | 2 + src/NATS.Client/JetStream/ApiResponse.cs | 3 +- .../JetStream/ConsumerConfiguration.cs | 21 +- src/NATS.Client/JetStream/ConsumerInfo.cs | 4 + src/NATS.Client/JetStream/JetStream.cs | 68 +++++-- .../JetStream/JetStreamManagement.cs | 2 +- src/NATS.Client/JetStream/JetStreamMsg.cs | 48 +++-- src/NATS.Client/JetStream/JsPrefixManager.cs | 2 +- .../JetStream/PullSubscribeOptions.cs | 52 ++--- .../JetStream/PushSubscribeOptions.cs | 57 +++--- src/NATS.Client/JetStream/ServerInfo.cs | 3 +- src/NATS.Client/JetStream/SubscribeOptions.cs | 27 ++- .../TestJetStreamPushAsync.cs | 192 +++++++++++++++++- .../UnitTests/Internals/TestValidator.cs | 12 ++ .../UnitTests/JetStream/TestJetStreamMsg.cs | 17 +- .../JetStream/TestPushPullSubscribeOptions.cs | 102 ++++++++++ src/Tests/UnitTests/TestBase.cs | 23 +++ 20 files changed, 537 insertions(+), 130 deletions(-) diff --git a/src/NATS.Client/Internals/JsonUtils.cs b/src/NATS.Client/Internals/JsonUtils.cs index 9326b4ada..3313cda2a 100644 --- a/src/NATS.Client/Internals/JsonUtils.cs +++ b/src/NATS.Client/Internals/JsonUtils.cs @@ -50,7 +50,7 @@ internal static List OptionalStringList(JSONNode node, String field) } internal static byte[] AsByteArrayFromBase64(JSONNode node) { - return string.IsNullOrEmpty(node.Value) ? null : Convert.FromBase64String(node.Value); + return Validator.NullOrEmpty(node.Value) ? null : Convert.FromBase64String(node.Value); } internal static DateTime AsDate(JSONNode node) diff --git a/src/NATS.Client/Internals/SimpleJson.cs b/src/NATS.Client/Internals/SimpleJson.cs index afb3a5cb0..88f633653 100644 --- a/src/NATS.Client/Internals/SimpleJson.cs +++ b/src/NATS.Client/Internals/SimpleJson.cs @@ -1036,7 +1036,7 @@ internal partial class JSONString : JSONNode public override JSONNodeType Tag { get { return JSONNodeType.String; } } public override bool IsString { get { return true; } } - public override bool ShouldWrite { get { return !string.IsNullOrEmpty(m_Data); } } + public override bool ShouldWrite { get { return !Validator.NullOrEmpty(m_Data); } } public override Enumerator GetEnumerator() { return new Enumerator(); } diff --git a/src/NATS.Client/Internals/Validator.cs b/src/NATS.Client/Internals/Validator.cs index f692bc7e8..3663ad535 100644 --- a/src/NATS.Client/Internals/Validator.cs +++ b/src/NATS.Client/Internals/Validator.cs @@ -40,6 +40,28 @@ internal static string ValidateDurableRequired(string durable, ConsumerConfigura "Durable is required and cannot contain a '.', '*' or '>' [null]"); } + internal static String ValidateMustMatchIfBothSupplied(String s1, String s2, String label1, String label2) { + // s1 | s2 || result + // ---- | ---- || -------------- + // null | null || valid, null s2 + // null | y || valid, y s2 + // x | null || valid, x s1 + // x | x || valid, x s1 + // x | y || invalid + s1 = EmptyAsNull(s1); + s2 = EmptyAsNull(s2); + if (s1 == null) { + return s2; // s2 can be either null or y + } + + // x / null or x / x + if (s2 == null || s1.Equals(s2)) { + return s1; + } + + throw new ArgumentException($"{label1} [{s1}] must match the {label2} [{s2}] if both are provided."); + } + public static string Validate(string s, bool required, string label, Func check) { string preCheck = EmptyAsNull(s); @@ -276,7 +298,11 @@ private static bool NotPrintableOrHasWildGtDollar(String s) { internal static string EmptyAsNull(string s) { - return string.IsNullOrEmpty(s) ? null : s; + return NullOrEmpty(s) ? null : s; + } + + internal static bool NullOrEmpty(String s) { + return s == null || s.Trim().Length == 0; } internal static bool ZeroOrLtMinus1(long l) diff --git a/src/NATS.Client/JetStream/ApiConstants.cs b/src/NATS.Client/JetStream/ApiConstants.cs index 13a8fe38d..0299d2b72 100644 --- a/src/NATS.Client/JetStream/ApiConstants.cs +++ b/src/NATS.Client/JetStream/ApiConstants.cs @@ -35,6 +35,7 @@ internal static class ApiConstants internal const string Current = "current"; internal const string Data = "data"; internal const string Deliver = "deliver"; + internal const string DeliverGroup = "deliver_group"; internal const string DeliverPolicy = "deliver_policy"; internal const string DeliverSubject = "deliver_subject"; internal const string Delivered = "delivered"; @@ -95,6 +96,7 @@ internal static class ApiConstants internal const string Port = "port"; internal const string Proto = "proto"; internal const string Purged = "purged"; + internal const string PushBound = "push_bound"; internal const string RateLimitBps = "rate_limit_bps"; internal const string ReplayPolicy = "replay_policy"; internal const string Replica = "replica"; diff --git a/src/NATS.Client/JetStream/ApiResponse.cs b/src/NATS.Client/JetStream/ApiResponse.cs index 5125af6d2..94a09511e 100644 --- a/src/NATS.Client/JetStream/ApiResponse.cs +++ b/src/NATS.Client/JetStream/ApiResponse.cs @@ -12,6 +12,7 @@ // limitations under the License. using System.Text; +using NATS.Client.Internals; using NATS.Client.Internals.SimpleJSON; namespace NATS.Client.JetStream @@ -34,7 +35,7 @@ internal ApiResponse(string json, bool throwOnError = false) { JsonNode = JSON.Parse(json); Type = JsonNode[ApiConstants.Type].Value; - if (string.IsNullOrEmpty(Type)) + if (Validator.NullOrEmpty(Type)) { Type = NoType; } diff --git a/src/NATS.Client/JetStream/ConsumerConfiguration.cs b/src/NATS.Client/JetStream/ConsumerConfiguration.cs index 06ddd7b09..f99c53e3b 100644 --- a/src/NATS.Client/JetStream/ConsumerConfiguration.cs +++ b/src/NATS.Client/JetStream/ConsumerConfiguration.cs @@ -29,6 +29,7 @@ public sealed class ConsumerConfiguration : JsonSerializable public string Description { get; } public string Durable { get; } public string DeliverSubject { get; } + public string DeliverGroup { get; } public ulong StartSeq { get; } public DateTime StartTime { get; } public Duration AckWait { get; } @@ -51,6 +52,7 @@ internal ConsumerConfiguration(JSONNode ccNode) Description = ccNode[ApiConstants.Description].Value; Durable = ccNode[ApiConstants.DurableName].Value; DeliverSubject = ccNode[ApiConstants.DeliverSubject].Value; + DeliverGroup = ccNode[ApiConstants.DeliverGroup].Value; StartSeq = ccNode[ApiConstants.OptStartSeq].AsUlong; StartTime = JsonUtils.AsDate(ccNode[ApiConstants.OptStartTime]); AckWait = JsonUtils.AsDuration(ccNode, ApiConstants.AckWait, DefaultAckWait); @@ -66,7 +68,7 @@ internal ConsumerConfiguration(JSONNode ccNode) internal ConsumerConfiguration(string description, string durable, DeliverPolicy deliverPolicy, ulong startSeq, DateTime startTime, AckPolicy ackPolicy, Duration ackWait, long maxDeliver, string filterSubject, ReplayPolicy replayPolicy, - string sampleFrequency, long rateLimit, string deliverSubject, long maxAckPending, + string sampleFrequency, long rateLimit, string deliverSubject, string deliverGroup, long maxAckPending, Duration idleHeartbeat, bool flowControl, long maxPullWaiting) { Description = description; @@ -82,6 +84,7 @@ internal ConsumerConfiguration(string description, string durable, DeliverPolicy SampleFrequency = sampleFrequency; RateLimit = rateLimit; DeliverSubject = deliverSubject; + DeliverGroup = deliverGroup; MaxAckPending = maxAckPending; IdleHeartbeat = idleHeartbeat; FlowControl = flowControl; @@ -96,6 +99,7 @@ internal override JSONNode ToJsonNode() [ApiConstants.DurableName] = Durable, [ApiConstants.DeliverPolicy] = DeliverPolicy.GetString(), [ApiConstants.DeliverSubject] = DeliverSubject, + [ApiConstants.DeliverGroup] = DeliverGroup, [ApiConstants.OptStartSeq] = StartSeq, [ApiConstants.OptStartTime] = JsonUtils.ToString(StartTime), [ApiConstants.AckPolicy] = AckPolicy.GetString(), @@ -130,6 +134,7 @@ public sealed class ConsumerConfigurationBuilder private string _description; private string _durable; private string _deliverSubject; + private string _deliverGroup; private ulong _startSeq; private DateTime _startTime; private Duration _ackWait = Duration.OfSeconds(30); @@ -144,6 +149,7 @@ public sealed class ConsumerConfigurationBuilder public string Durable => _durable; public string DeliverSubject => _deliverSubject; + public string DeliverGroup => _deliverGroup; public string FilterSubject => _filterSubject; public long MaxAckPending => _maxAckPending; public AckPolicy AcknowledgementPolicy => _ackPolicy; @@ -166,6 +172,7 @@ public ConsumerConfigurationBuilder(ConsumerConfiguration cc) _sampleFrequency = cc.SampleFrequency; _rateLimit = cc.RateLimit; _deliverSubject = cc.DeliverSubject; + _deliverGroup = cc.DeliverGroup; _maxAckPending = cc.MaxAckPending; _idleHeartbeat = cc.IdleHeartbeat; _flowControl = cc.FlowControl; @@ -216,6 +223,17 @@ public ConsumerConfigurationBuilder WithDeliverSubject(string subject) return this; } + /// + /// Sets the group to deliver messages to. + /// + /// the delivery group. + /// The ConsumerConfigurationBuilder + public ConsumerConfigurationBuilder WithDeliverGroup(string group) + { + _deliverGroup = group; + return this; + } + /// /// Sets the start sequence of the ConsumerConfiguration. /// @@ -400,6 +418,7 @@ public ConsumerConfiguration Build() _sampleFrequency, _rateLimit, _deliverSubject, + _deliverGroup, _maxAckPending, _idleHeartbeat, _flowControl, diff --git a/src/NATS.Client/JetStream/ConsumerInfo.cs b/src/NATS.Client/JetStream/ConsumerInfo.cs index d228db969..7d31de879 100644 --- a/src/NATS.Client/JetStream/ConsumerInfo.cs +++ b/src/NATS.Client/JetStream/ConsumerInfo.cs @@ -29,6 +29,8 @@ public sealed class ConsumerInfo : ApiResponse public long NumWaiting { get; private set; } public long NumAckPending { get; private set; } public long NumRedelivered { get; private set; } + public ClusterInfo ClusterInfo { get; private set; } + public bool PushBound { get; private set; } internal ConsumerInfo(Msg msg, bool throwOnError) : base(msg, throwOnError) { @@ -57,6 +59,8 @@ private void Init(JSONNode ciNode) NumWaiting = ciNode[ApiConstants.NumWaiting].AsLong; NumAckPending = ciNode[ApiConstants.NumAckPending].AsLong; NumRedelivered = ciNode[ApiConstants.NumRedelivered].AsLong; + ClusterInfo = ClusterInfo.OptionalInstance(ciNode[ApiConstants.Cluster]); + PushBound = ciNode[ApiConstants.PushBound].AsBool; } } } diff --git a/src/NATS.Client/JetStream/JetStream.cs b/src/NATS.Client/JetStream/JetStream.cs index f54c173f6..8f51cdb88 100644 --- a/src/NATS.Client/JetStream/JetStream.cs +++ b/src/NATS.Client/JetStream/JetStream.cs @@ -146,21 +146,27 @@ Subscription CreateSubscription(string subject, string queueName, SubscribeOptions so; if (isPullMode) { - so = pullOpts; + so = pullOpts; // options must have already been checked to be non null stream = pullOpts.Stream; ccBuilder = Builder(pullOpts.ConsumerConfiguration); ccBuilder.WithDeliverSubject(null); // pull mode can't have a deliver subject + // queueName is already null + ccBuilder.WithDeliverGroup(null); // pull mode can't have a deliver group } else { - so = pushOpts == null - ? PushSubscribeOptions.Builder().Build() - : pushOpts; + so = pushOpts ?? PushSubscribeOptions.Builder().Build(); stream = so.Stream; // might be null, that's ok (see direct) ccBuilder = Builder(so.ConsumerConfiguration); + ccBuilder.WithMaxPullWaiting(0); // this does not apply to push, in fact will error b/c deliver subject will be set + // deliver subject does not have to be cleared + // figure out the queue name + queueName = Validator.ValidateMustMatchIfBothSupplied(ccBuilder.DeliverGroup, queueName, + "Consumer Configuration DeliverGroup", "Queue Name"); + ccBuilder.WithDeliverGroup(queueName); // and set it in case the deliver group was null } // - bool direct = so.Direct; + bool bindMode = so.Bind; string durable = ccBuilder.Durable; string inbox = ccBuilder.DeliverSubject; @@ -176,27 +182,47 @@ Subscription CreateSubscription(string subject, string queueName, // 2. Is this a durable or ephemeral if (durable != null) { - ConsumerInfo consumerInfo = + ConsumerInfo lookedUpInfo = LookupConsumerInfo(stream, durable); - if (consumerInfo != null) { // the consumer for that durable already exists + if (lookedUpInfo != null) { // the consumer for that durable already exists createConsumer = false; - ConsumerConfiguration cc = consumerInfo.Configuration; + ConsumerConfiguration lookedUpConfig = lookedUpInfo.Configuration; // durable already exists, make sure the filter subject matches - string existingFilterSubject = cc.FilterSubject; - if (filterSubject != null && !filterSubject.Equals(existingFilterSubject)) { + string lookedUp = Validator.EmptyAsNull(lookedUpConfig.FilterSubject); + if (filterSubject != null && !filterSubject.Equals(lookedUp)) { throw new ArgumentException( $"Subject {subject} mismatches consumer configuration {filterSubject}."); } - - filterSubject = existingFilterSubject; - - // use the deliver subject as the inbox. It may be null, that's ok - inbox = cc.DeliverSubject; + filterSubject = lookedUp; + + lookedUp = Validator.EmptyAsNull(lookedUpConfig.DeliverGroup); + if (lookedUp == null) { + // lookedUp was null, means existing consumer is not a queue consumer + if (queueName == null) { + // ok fine, no queue requested and the existing consumer is also not a queue consumer + // we must check if the consumer is in use though + if (lookedUpInfo.PushBound) { + throw new ArgumentException($"Consumer [{durable}] is already bound to a subscription."); + } + } + else { // else they requested a queue but this durable was not configured as queue + throw new ArgumentException($"Existing consumer [{durable}] is not configured as a queue / deliver group."); + } + } + else if (queueName == null) { + throw new ArgumentException($"Existing consumer [{durable}] is configured as a queue / deliver group."); + } + else if (!lookedUp.Equals(queueName)) { + throw new ArgumentException( + $"Existing consumer deliver group {lookedUp} does not match requested queue / deliver group {queueName}."); + } + + inbox = lookedUpConfig.DeliverSubject; // use the deliver subject as the inbox. It may be null, that's ok } - else if (direct) { - throw new ArgumentException("Consumer not found for durable. Required in direct mode."); + else if (bindMode) { + throw new ArgumentException("Consumer not found for durable. Required in bind mode."); } } @@ -324,7 +350,7 @@ public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, EventH public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, string queue, EventHandler handler, bool autoAck) { Validator.ValidateSubject(subject, true); - queue = Validator.ValidateQueueName(queue, false); + queue = Validator.EmptyAsNull(Validator.ValidateQueueName(queue, false)); Validator.ValidateNotNull(handler, nameof(handler)); return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, queue, handler, autoAck, null, null); } @@ -339,7 +365,7 @@ public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, EventH public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, string queue, EventHandler handler, bool autoAck, PushSubscribeOptions options) { Validator.ValidateSubject(subject, true); - queue = Validator.ValidateQueueName(queue, false); + queue = Validator.EmptyAsNull(Validator.ValidateQueueName(queue, false)); Validator.ValidateNotNull(handler, nameof(handler)); return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, queue, handler, autoAck, options, null); } @@ -359,14 +385,14 @@ public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, PushSubs public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, string queue) { Validator.ValidateSubject(subject, true); - queue = Validator.ValidateQueueName(queue, false); + queue = Validator.EmptyAsNull(Validator.ValidateQueueName(queue, false)); return (IJetStreamPushSyncSubscription) CreateSubscription(subject, queue, null, false, null, null); } public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, string queue, PushSubscribeOptions options) { Validator.ValidateSubject(subject, true); - queue = Validator.ValidateQueueName(queue, false); + queue = Validator.EmptyAsNull(Validator.ValidateQueueName(queue, false)); return (IJetStreamPushSyncSubscription) CreateSubscription(subject, queue, null, false, options, null); } } diff --git a/src/NATS.Client/JetStream/JetStreamManagement.cs b/src/NATS.Client/JetStream/JetStreamManagement.cs index 79216047c..5d58dc3bf 100644 --- a/src/NATS.Client/JetStream/JetStreamManagement.cs +++ b/src/NATS.Client/JetStream/JetStreamManagement.cs @@ -37,7 +37,7 @@ private StreamInfo AddOrUpdateStream(StreamConfiguration config, string addUpdat { Validator.ValidateNotNull(config, nameof(config)); - if (string.IsNullOrEmpty(config.Name)) { + if (Validator.NullOrEmpty(config.Name)) { throw new ArgumentException("Configuration must have a valid stream name"); } diff --git a/src/NATS.Client/JetStream/JetStreamMsg.cs b/src/NATS.Client/JetStream/JetStreamMsg.cs index 26a8ad74a..16993d75a 100644 --- a/src/NATS.Client/JetStream/JetStreamMsg.cs +++ b/src/NATS.Client/JetStream/JetStreamMsg.cs @@ -162,8 +162,6 @@ public sealed class MetaData internal string AccountHash { get; } - internal string Token { get; } - // Caller must ensure this is a JS message internal MetaData(string metaData) { @@ -175,45 +173,51 @@ internal MetaData(string metaData) int streamIndex; bool hasPending; - bool hasDomainHashToken; + bool hasDomainAndHash; if (parts.Length == 8) { streamIndex = 2; hasPending = false; - hasDomainHashToken = false; + hasDomainAndHash = false; } else if (parts.Length == 9) { streamIndex = 2; hasPending = true; - hasDomainHashToken = false; + hasDomainAndHash = false; } - else if (parts.Length >= 12) + else if (parts.Length >= 11) { streamIndex = 4; hasPending = true; - hasDomainHashToken = true; + hasDomainAndHash = true; } else { throw new NATSException($"Invalid MetaData: {metaData}"); } - Prefix = parts[0]; - // "ack" = parts[1] - Domain = hasDomainHashToken ? parts[2] : null; - AccountHash = hasDomainHashToken ? parts[3] : null; - Stream = parts[streamIndex]; - Consumer = parts[streamIndex + 1]; - NumDelivered = ulong.Parse(parts[streamIndex + 2]); - StreamSequence = ulong.Parse(parts[streamIndex + 3]); - ConsumerSequence = ulong.Parse(parts[streamIndex + 4]); - - TimestampNanos = ulong.Parse(parts[streamIndex + 5]); - Timestamp = epochTime.AddTicks((long)TimestampNanos/100); - - NumPending = hasPending ? ulong.Parse(parts[streamIndex + 6]) : 0; - Token = hasDomainHashToken ? parts[streamIndex + 7] : null; + try + { + Prefix = parts[0]; + // "ack" = parts[1] + Domain = hasDomainAndHash ? parts[2] : null; + AccountHash = hasDomainAndHash ? parts[3] : null; + Stream = parts[streamIndex]; + Consumer = parts[streamIndex + 1]; + NumDelivered = ulong.Parse(parts[streamIndex + 2]); + StreamSequence = ulong.Parse(parts[streamIndex + 3]); + ConsumerSequence = ulong.Parse(parts[streamIndex + 4]); + + TimestampNanos = ulong.Parse(parts[streamIndex + 5]); + Timestamp = epochTime.AddTicks((long)TimestampNanos / 100); + + NumPending = hasPending ? ulong.Parse(parts[streamIndex + 6]) : 0; + } + catch (Exception) + { + throw new NATSException($"Invalid MetaData: {metaData}"); + } } public override string ToString() diff --git a/src/NATS.Client/JetStream/JsPrefixManager.cs b/src/NATS.Client/JetStream/JsPrefixManager.cs index 70fef7a8c..81f26e0f2 100644 --- a/src/NATS.Client/JetStream/JsPrefixManager.cs +++ b/src/NATS.Client/JetStream/JsPrefixManager.cs @@ -22,7 +22,7 @@ internal static class JsPrefixManager private static ConcurrentDictionary JsPrefixes = new ConcurrentDictionary(); internal static string AddPrefix(string prefix) { - if (string.IsNullOrEmpty(prefix) || prefix.Equals(JetStreamConstants.JsapiPrefix)) { + if (Validator.NullOrEmpty(prefix) || prefix.Equals(JetStreamConstants.JsapiPrefix)) { return JetStreamConstants.JsapiPrefix; } diff --git a/src/NATS.Client/JetStream/PullSubscribeOptions.cs b/src/NATS.Client/JetStream/PullSubscribeOptions.cs index ef5173c99..f1cd9f0dc 100644 --- a/src/NATS.Client/JetStream/PullSubscribeOptions.cs +++ b/src/NATS.Client/JetStream/PullSubscribeOptions.cs @@ -11,8 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using NATS.Client.Internals; - namespace NATS.Client.JetStream { public sealed class PullSubscribeOptions : SubscribeOptions @@ -22,19 +20,19 @@ public sealed class PullSubscribeOptions : SubscribeOptions /// public string Durable => ConsumerConfiguration.Durable; - // Validation is done by the builder Build() - private PullSubscribeOptions(string stream, bool direct, ConsumerConfiguration config) - : base(stream, direct, config) {} + // Validation is done by base class + private PullSubscribeOptions(string stream, string durable, bool bind, ConsumerConfiguration cc) + : base(stream, durable, true, bind, null, null, cc) {} /// /// Create PushSubscribeOptions where you are binding to - /// a specific stream, specific durable and are using direct mode + /// a specific stream, specific durable and are using bind mode /// /// the stream name to bind to /// the durable name /// the PushSubscribeOptions - public static PullSubscribeOptions DirectBind(string stream, string durable) { - return new PullSubscribeOptionsBuilder().WithStream(stream).WithDurable(durable).Direct().Build(); + public static PullSubscribeOptions BindTo(string stream, string durable) { + return new PullSubscribeOptionsBuilder().WithStream(stream).WithDurable(durable).Bind(true).Build(); } /// @@ -48,30 +46,30 @@ public static PullSubscribeOptionsBuilder Builder() public sealed class PullSubscribeOptionsBuilder { - private string _durable; private string _stream; - private bool _direct; + private bool _bind; + private string _durable; private ConsumerConfiguration _config; /// - /// Set the durable + /// Set the stream name /// - /// the durable value - /// The PullSubscribeOptionsBuilder - public PullSubscribeOptionsBuilder WithDurable(string durable) + /// the stream name + /// The builder + public PullSubscribeOptionsBuilder WithStream(string stream) { - _durable = durable; + _stream = stream; return this; } /// - /// Set the stream name + /// Set the durable /// - /// the stream name - /// The builder - public PullSubscribeOptionsBuilder WithStream(string stream) + /// the durable value + /// The PullSubscribeOptionsBuilder + public PullSubscribeOptionsBuilder WithDurable(string durable) { - _stream = stream; + _durable = durable; return this; } @@ -79,9 +77,9 @@ public PullSubscribeOptionsBuilder WithStream(string stream) /// Set as a direct subscribe /// /// The builder - public PullSubscribeOptionsBuilder Direct() + public PullSubscribeOptionsBuilder Bind(bool isBind) { - _direct = true; + _bind = isBind; return this; } @@ -102,15 +100,7 @@ public PullSubscribeOptionsBuilder WithConfiguration(ConsumerConfiguration confi /// The PullSubscribeOptions object. public PullSubscribeOptions Build() { - _stream = Validator.ValidateStreamName(_stream, false); - - _durable = Validator.ValidateDurableRequired(_durable, _config); - - _config = ConsumerConfiguration.Builder(_config) - .WithDurable(_durable) - .Build(); - - return new PullSubscribeOptions(_stream, _direct, _config); + return new PullSubscribeOptions(_stream, _durable, _bind, _config); } } } diff --git a/src/NATS.Client/JetStream/PushSubscribeOptions.cs b/src/NATS.Client/JetStream/PushSubscribeOptions.cs index 99c61f55d..ee1aeb63d 100644 --- a/src/NATS.Client/JetStream/PushSubscribeOptions.cs +++ b/src/NATS.Client/JetStream/PushSubscribeOptions.cs @@ -11,8 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using NATS.Client.Internals; - namespace NATS.Client.JetStream { public sealed class PushSubscribeOptions : SubscribeOptions @@ -27,9 +25,15 @@ public sealed class PushSubscribeOptions : SubscribeOptions /// public string DeliverSubject => ConsumerConfiguration.DeliverSubject; - // Validation is done by the builder Build() - private PushSubscribeOptions(string stream, bool direct, ConsumerConfiguration config) - : base(stream, direct, config) {} + /// + /// Gets the deliver group + /// + public string DeliverGroup => ConsumerConfiguration.DeliverGroup; + + // Validation is done by base class + private PushSubscribeOptions(string stream, string durable, bool bind, + string deliverSubject, string deliverGroup, ConsumerConfiguration cc) + : base(stream, durable, false, bind, deliverSubject, deliverGroup, cc) {} /// /// Create PushSubscribeOptions where you are binding to @@ -37,7 +41,7 @@ private PushSubscribeOptions(string stream, bool direct, ConsumerConfiguration c /// /// the stream name to bind to /// the PushSubscribeOptions - public static PushSubscribeOptions Bind(string stream) { + public static PushSubscribeOptions ForStream(string stream) { return new PushSubscribeOptionsBuilder().WithStream(stream).Build(); } @@ -48,8 +52,8 @@ public static PushSubscribeOptions Bind(string stream) { /// the stream name to bind to /// the durable name /// the PushSubscribeOptions - public static PushSubscribeOptions DirectBind(string stream, string durable) { - return new PushSubscribeOptionsBuilder().WithStream(stream).WithDurable(durable).Direct().Build(); + public static PushSubscribeOptions BindTo(string stream, string durable) { + return new PushSubscribeOptionsBuilder().WithStream(stream).WithDurable(durable).Bind().Build(); } /// @@ -64,11 +68,12 @@ public static PushSubscribeOptionsBuilder Builder() { public sealed class PushSubscribeOptionsBuilder { - private string _durable; - private string _deliverSubject; private string _stream; - private bool _direct; + private bool _bind; + private string _durable; private ConsumerConfiguration _config; + private string _deliverSubject; + private string _deliverGroup; /// /// Set the stream name @@ -85,9 +90,9 @@ public PushSubscribeOptionsBuilder WithStream(string stream) /// Set as a direct subscribe /// /// The builder - public PushSubscribeOptionsBuilder Direct() + public PushSubscribeOptionsBuilder Bind() { - _direct = true; + _bind = true; return this; } @@ -124,26 +129,24 @@ public PushSubscribeOptionsBuilder WithDeliverSubject(string deliverSubject) return this; } + /// + /// Set the deliver group + /// + /// the deliver group value + /// The PushSubscribeOptionsBuilder + public PushSubscribeOptionsBuilder WithDeliverGroup(string deliverGroup) + { + _deliverGroup = deliverGroup; + return this; + } + /// /// Builds the PushSubscribeOptions /// /// The PushSubscribeOptions object. public PushSubscribeOptions Build() { - _stream = Validator.ValidateStreamName(_stream, false); - - _durable = Validator.ValidateDurable(_durable, false); - if (_durable == null && _config != null) - { - _durable = Validator.ValidateDurable(_config.Durable, false); - } - - _config = ConsumerConfiguration.Builder(_config) - .WithDurable(_durable) - .WithDeliverSubject(_deliverSubject) - .Build(); - - return new PushSubscribeOptions(_stream, _direct, _config); + return new PushSubscribeOptions(_stream, _durable, _bind, _deliverSubject, _deliverGroup, _config); } } } diff --git a/src/NATS.Client/JetStream/ServerInfo.cs b/src/NATS.Client/JetStream/ServerInfo.cs index d9f6735a7..d05032a5e 100644 --- a/src/NATS.Client/JetStream/ServerInfo.cs +++ b/src/NATS.Client/JetStream/ServerInfo.cs @@ -13,6 +13,7 @@ using System.Linq; using System.Text; +using NATS.Client.Internals; using NATS.Client.Internals.SimpleJSON; namespace NATS.Client.JetStream @@ -61,7 +62,7 @@ public ServerInfo(string json) ClientIp = siNode[ApiConstants.ClientIp].Value; Cluster = siNode[ApiConstants.Cluster].Value; ConnectURLs = siNode[ApiConstants.ConnectUrls].Children - .Where(n => !string.IsNullOrEmpty(n.Value)) + .Where(n => !Validator.NullOrEmpty(n.Value)) .Select(n => n.Value) .ToArray(); } diff --git a/src/NATS.Client/JetStream/SubscribeOptions.cs b/src/NATS.Client/JetStream/SubscribeOptions.cs index 2d7adaa4d..c61d07f42 100644 --- a/src/NATS.Client/JetStream/SubscribeOptions.cs +++ b/src/NATS.Client/JetStream/SubscribeOptions.cs @@ -11,24 +11,37 @@ // See the License for the specific language governing permissions and // limitations under the License. +using NATS.Client.Internals; + namespace NATS.Client.JetStream { /// /// The base class for all Subscribe Options containing a stream and /// consumer configuration. /// - public class SubscribeOptions + public abstract class SubscribeOptions { internal string Stream { get; } - internal bool Direct { get; } + internal bool Bind { get; } internal ConsumerConfiguration ConsumerConfiguration { get;} - internal SubscribeOptions(string stream, bool direct, ConsumerConfiguration configuration) + internal SubscribeOptions(string stream, string durable, bool pull, bool bind, + string deliverSubject, string deliverGroup, ConsumerConfiguration cc) { - // THESE ARE ALREADY BE VALIDATED BY PRIVATE CONSTRUCTORS THAT CALL THIS base METHOD! - Stream = stream; - Direct = direct; - ConsumerConfiguration = configuration; + Stream = Validator.ValidateStreamName(stream, bind); + + durable = Validator.ValidateMustMatchIfBothSupplied(durable, cc?.Durable, "Builder Durable", "Consumer Configuration Durable"); + durable = Validator.ValidateDurable(durable, pull || bind); + + deliverGroup = Validator.ValidateMustMatchIfBothSupplied(deliverGroup, cc?.DeliverGroup, "Builder Deliver Group", "Consumer Configuration Deliver Group"); + + ConsumerConfiguration = ConsumerConfiguration.Builder(cc) + .WithDurable(durable) + .WithDeliverSubject(deliverSubject) + .WithDeliverGroup(deliverGroup) + .Build(); + + Bind = bind; } } } diff --git a/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs b/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs index c314f7928..7ecfb3c02 100644 --- a/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs +++ b/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs @@ -11,9 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System; +using System.Collections.Generic; +using System.Text; using System.Threading; using NATS.Client; using NATS.Client.JetStream; +using UnitTests; using Xunit; using Xunit.Abstractions; using static UnitTests.TestBase; @@ -46,7 +50,7 @@ public void TestHandlerSub() CountdownEvent latch = new CountdownEvent(10); int received = 0; - + void TestHandler(object sender, MsgHandlerEventArgs args) { received++; @@ -64,7 +68,7 @@ void TestHandler(object sender, MsgHandlerEventArgs args) // Wait for messages to arrive using the countdown latch. latch.Wait(); - + Assert.Equal(10, received); }); } @@ -86,13 +90,13 @@ public void TestHandlerAutoAck() // 1. auto ack true CountdownEvent latch1 = new CountdownEvent(10); int handlerReceived1 = 0; - + // create our message handler, does not ack void Handler1(object sender, MsgHandlerEventArgs args) { handlerReceived1++; latch1.Signal(); - } + } // subscribe using the handler, auto ack true PushSubscribeOptions pso1 = PushSubscribeOptions.Builder() @@ -101,17 +105,17 @@ void Handler1(object sender, MsgHandlerEventArgs args) // wait for messages to arrive using the countdown latch. latch1.Wait(); - + Assert.Equal(10, handlerReceived1); - + // check that all the messages were read by the durable IJetStreamPushSyncSubscription sub = js.PushSubscribeSync(SUBJECT, pso1); AssertNoMoreMessages(sub); - + // 2. auto ack false CountdownEvent latch2 = new CountdownEvent(10); int handlerReceived2 = 0; - + // create our message handler, also does not ack void Handler2(object sender, MsgHandlerEventArgs args) { @@ -128,13 +132,181 @@ void Handler2(object sender, MsgHandlerEventArgs args) // wait for messages to arrive using the countdown latch. latch2.Wait(); Assert.Equal(10, handlerReceived2); - + Thread.Sleep(2000); // just give it time for the server to realize the messages are not ack'ed - + // check that we get all the messages again sub = js.PushSubscribeSync(SUBJECT, pso2); Assert.Equal(10, ReadMessagesAck(sub).Count); }); } + + [Fact] + public void TestQueueSubWorkflow() + { + Context.RunInJsServer(c => + { + // create the stream. + CreateMemoryStream(c, STREAM, SUBJECT); + + // Create our JetStream context to receive JetStream messages. + IJetStream js = c.CreateJetStreamContext(); + + // Setup the subscribers + // - the PushSubscribeOptions can be re-used since all the subscribers are the same + // - use a concurrent integer to track all the messages received + // - have a list of subscribers and threads so I can track them + PushSubscribeOptions pso = PushSubscribeOptions.Builder().WithDurable(DURABLE).Build(); + InterlockedLong allReceived = new InterlockedLong(); + IList subscribers = new List(); + IList subThreads = new List(); + for (int id = 1; id <= 3; id++) { + // setup the subscription + IJetStreamPushSyncSubscription sub = js.PushSubscribeSync(SUBJECT, QUEUE, pso); + // create and track the runnable + JsQueueSubscriber qs = new JsQueueSubscriber(100, js, sub, allReceived); + subscribers.Add(qs); + // create, track and start the thread + Thread t = new Thread(qs.Run); + subThreads.Add(t); + t.Start(); + } + c.Flush(DefaultTimeout); // flush outgoing communication with/to the server + + // create and start the publishing + Thread pubThread = new Thread(new JsPublisher(js, 100).Run); + pubThread.Start(); + + // wait for all threads to finish + pubThread.Join(5000); + foreach (Thread t in subThreads) { + t.Join(5000); + } + + ISet uniqueDatas = new HashSet(); + // count + int count = 0; + foreach (JsQueueSubscriber qs in subscribers) { + int r = qs.received; + Assert.True(r > 0); + count += r; + foreach (string s in qs.datas) { + Assert.True(uniqueDatas.Add(s)); + } + } + + Assert.Equal(100, count); + + }); + } + + [Fact] + public void TestQueueSubErrors() + { + Context.RunInJsServer(c => + { + // create the stream. + CreateMemoryStream(c, STREAM, SUBJECT); + + // Create our JetStream context to receive JetStream messages. + IJetStream js = c.CreateJetStreamContext(); + + // create a durable that is not a queue + PushSubscribeOptions pso1 = PushSubscribeOptions.Builder().WithDurable(Durable(1)).Build(); + js.PushSubscribeSync(SUBJECT, pso1); + + ArgumentException iae = Assert.Throws(() => js.PushSubscribeSync(SUBJECT, pso1)); + String expected = $"Consumer [{Durable(1)}] is already bound to a subscription."; + Assert.Equal(expected, iae.Message); + + iae = Assert.Throws(() => js.PushSubscribeSync(SUBJECT, Queue(1), pso1)); + expected = $"Existing consumer [{Durable(1)}] is not configured as a queue / deliver group."; + Assert.Equal(expected, iae.Message); + + PushSubscribeOptions pso21 = PushSubscribeOptions.Builder().WithDurable(Durable(2)).Build(); + js.PushSubscribeSync(SUBJECT, Queue(21), pso21); + + PushSubscribeOptions pso22 = PushSubscribeOptions.Builder().WithDurable(Durable(2)).Build(); + iae = Assert.Throws(() => js.PushSubscribeSync(SUBJECT, Queue(22), pso22)); + expected = $"Existing consumer deliver group {Queue(21)} does not match requested queue / deliver group {Queue(22)}."; + Assert.Equal(expected, iae.Message); + + PushSubscribeOptions pso23 = PushSubscribeOptions.Builder().WithDurable(Durable(2)).Build(); + iae = Assert.Throws(() => js.PushSubscribeSync(SUBJECT, pso23)); + expected = $"Existing consumer [{Durable(2)}] is configured as a queue / deliver group."; + Assert.Equal(expected, iae.Message); + + PushSubscribeOptions pso3 = PushSubscribeOptions.Builder() + .WithDurable(Durable(3)) + .WithDeliverGroup(Queue(31)) + .Build(); + iae = Assert.Throws(() => js.PushSubscribeSync(SUBJECT, Queue(32), pso3)); + expected = $"Consumer Configuration DeliverGroup [{Queue(31)}] must match the Queue Name [{Queue(32)}] if both are provided."; + Assert.Equal(expected, iae.Message); + }); + } + } + + class JsPublisher + { + IJetStream js; + int msgCount; + + public JsPublisher(IJetStream js, int msgCount) + { + this.js = js; + this.msgCount = msgCount; + } + + public void Run() + { + for (int x = 1; x <= msgCount; x++) + { + js.Publish(SUBJECT, Encoding.ASCII.GetBytes("Data # " + x)); + } + } + } + + class JsQueueSubscriber + { + int msgCount; + IJetStream js; + IJetStreamPushSyncSubscription sub; + InterlockedLong allReceived; + public int received; + public IList datas; + + public JsQueueSubscriber(int msgCount, IJetStream js, IJetStreamPushSyncSubscription sub, InterlockedLong allReceived) + { + this.msgCount = msgCount; + this.js = js; + this.sub = sub; + this.allReceived = allReceived; + received = 0; + datas = new List(); + } + + public void Run() + { + while (allReceived.Get() < msgCount) + { + try + { + Msg msg = sub.NextMessage(500); + while (msg != null) + { + received++; + allReceived.Inc(); + datas.Add(Encoding.UTF8.GetString(msg.Data)); + msg.Ack(); + msg = sub.NextMessage(500); + } + } + catch (NATSTimeoutException e) + { + // timeout is acceptable, means no messages available. + } + } + } } } diff --git a/src/Tests/UnitTests/Internals/TestValidator.cs b/src/Tests/UnitTests/Internals/TestValidator.cs index 20ac84b1a..d9c54c6ed 100644 --- a/src/Tests/UnitTests/Internals/TestValidator.cs +++ b/src/Tests/UnitTests/Internals/TestValidator.cs @@ -175,6 +175,18 @@ public void TestValidateJetStreamPrefix() Assert.Throws(() => Validator.ValidateJetStreamPrefix(HasLow)); } + [Fact] + public void TestValidateMustMatchIfBothSupplied() + { + Assert.Null(Validator.ValidateMustMatchIfBothSupplied(null, null, "", "")); + Assert.Equal("y", Validator.ValidateMustMatchIfBothSupplied(null, "y", "", "")); + Assert.Equal("y", Validator.ValidateMustMatchIfBothSupplied("", "y", "", "")); + Assert.Equal("x", Validator.ValidateMustMatchIfBothSupplied("x", null, "", "")); + Assert.Equal("x", Validator.ValidateMustMatchIfBothSupplied("x", " ", "", "")); + Assert.Equal("x", Validator.ValidateMustMatchIfBothSupplied("x", "x", "", "")); + Assert.Throws(() => Validator.ValidateMustMatchIfBothSupplied("x", "y", "", "")); + } + [Fact] public void TestNotNull() { diff --git a/src/Tests/UnitTests/JetStream/TestJetStreamMsg.cs b/src/Tests/UnitTests/JetStream/TestJetStreamMsg.cs index c91537d6f..454e91dbd 100644 --- a/src/Tests/UnitTests/JetStream/TestJetStreamMsg.cs +++ b/src/Tests/UnitTests/JetStream/TestJetStreamMsg.cs @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +using NATS.Client; using NATS.Client.JetStream; using Xunit; @@ -20,7 +21,12 @@ public class TestJetStreamMsg : TestBase { const string TestMetaV0 = "$JS.ACK.test-stream.test-consumer.1.2.3.1605139610113260000"; const string TestMetaV1 = "$JS.ACK.test-stream.test-consumer.1.2.3.1605139610113260000.4"; - const string TestMetaV2 = "$JS.ACK.v2Domain.v2Hash.test-stream.test-consumer.1.2.3.1605139610113260000.4.v2Token"; + const string TestMetaV2 = "$JS.ACK.v2Domain.v2Hash.test-stream.test-consumer.1.2.3.1605139610113260000.4"; + const string TestMetaVFuture = "$JS.ACK.v2Domain.v2Hash.test-stream.test-consumer.1.2.3.1605139610113260000.4.dont.care.how.many.more"; + const string InvalidMetaNoAck = "$JS.nope.test-stream.test-consumer.1.2.3.1605139610113260000"; + const string InvalidMetaLt8Tokens = "$JS.ACK.less-than.8-tokens.1.2.3"; + const string InvalidMeta10Tokens = "$JS.ACK.v2Domain.v2Hash.test-stream.test-consumer.1.2.3.1605139610113260000"; + const string InvalidMetaData = "$JS.ACK.v2Domain.v2Hash.test-stream.test-consumer.1.2.3.1605139610113260000.not-a-number"; [Fact] public void MetaDataTests() @@ -28,6 +34,12 @@ public void MetaDataTests() ValidateMeta(false, false, new MetaData(TestMetaV0)); ValidateMeta(true, false, new MetaData(TestMetaV1)); ValidateMeta(true, true, new MetaData(TestMetaV2)); + ValidateMeta(true, true, new MetaData(TestMetaVFuture)); + + Assert.Throws(() => ValidateMeta(true, true, new MetaData(InvalidMetaNoAck))); + Assert.Throws(() => ValidateMeta(true, true, new MetaData(InvalidMetaLt8Tokens))); + Assert.Throws(() => ValidateMeta(true, true, new MetaData(InvalidMeta10Tokens))); + Assert.Throws(() => ValidateMeta(true, true, new MetaData(InvalidMetaData))); } private static void ValidateMeta(bool hasPending, bool hasDomainHashToken, MetaData meta) @@ -47,13 +59,10 @@ private static void ValidateMeta(bool hasPending, bool hasDomainHashToken, MetaD if (hasDomainHashToken) { Assert.Equal("v2Domain", meta.Domain); Assert.Equal("v2Hash", meta.AccountHash); - Assert.Equal("v2Token", meta.Token); - } else { Assert.Null(meta.Domain); Assert.Null(meta.AccountHash); - Assert.Null(meta.Token); } } } diff --git a/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs b/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs index 404503eb7..6e686afa8 100644 --- a/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs +++ b/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs @@ -100,5 +100,107 @@ public void TestPullValidation() ConsumerConfiguration cc = ConsumerConfiguration.Builder().WithDurable(DURABLE).Build(); PullSubscribeOptions.Builder().WithConfiguration(cc).Build(); } + + [Fact] + public void TestDurableValidation() + { + // push + Assert.Null(PushSubscribeOptions.Builder() + .WithDurable(null) + .WithConfiguration(ConsumerConfiguration.Builder().WithDurable(null).Build()) + .Build() + .Durable); + + Assert.Equal("y", PushSubscribeOptions.Builder() + .WithDurable(null) + .WithConfiguration(ConsumerConfiguration.Builder().WithDurable("y").Build()) + .Build() + .Durable); + + Assert.Equal("x", PushSubscribeOptions.Builder() + .WithDurable("x") + .WithConfiguration(ConsumerConfiguration.Builder().WithDurable(null).Build()) + .Build() + .Durable); + + Assert.Equal("x", PushSubscribeOptions.Builder() + .WithDurable("x") + .WithConfiguration(ConsumerConfiguration.Builder().WithDurable("x").Build()) + .Build() + .Durable); + + Assert.Throws(() => PushSubscribeOptions.Builder() + .WithDurable("x") + .WithConfiguration(ConsumerConfiguration.Builder().WithDurable("y").Build()) + .Build()); + + Assert.Null(PushSubscribeOptions.Builder().Build().Durable); + + // pull + Assert.Throws(() => PullSubscribeOptions.Builder() + .WithDurable(null) + .WithConfiguration(ConsumerConfiguration.Builder().WithDurable(null).Build()) + .Build() + .Durable); + + Assert.Equal("y", PullSubscribeOptions.Builder() + .WithDurable(null) + .WithConfiguration(ConsumerConfiguration.Builder().WithDurable("y").Build()) + .Build() + .Durable); + + Assert.Equal("x", PullSubscribeOptions.Builder() + .WithDurable("x") + .WithConfiguration(ConsumerConfiguration.Builder().WithDurable(null).Build()) + .Build() + .Durable); + + Assert.Equal("x", PullSubscribeOptions.Builder() + .WithDurable("x") + .WithConfiguration(ConsumerConfiguration.Builder().WithDurable("x").Build()) + .Build() + .Durable); + + Assert.Throws(() => PullSubscribeOptions.Builder() + .WithDurable("x") + .WithConfiguration(ConsumerConfiguration.Builder().WithDurable("y").Build()) + .Build()); + + Assert.Throws(() => PullSubscribeOptions.Builder().Build()); + } + + [Fact] + public void TestDeliverGroupValidation() + { + Assert.Null(PushSubscribeOptions.Builder() + .WithDeliverGroup(null) + .WithConfiguration(ConsumerConfiguration.Builder().WithDeliverGroup(null).Build()) + .Build() + .DeliverGroup); + + Assert.Equal("y", PushSubscribeOptions.Builder() + .WithDeliverGroup(null) + .WithConfiguration(ConsumerConfiguration.Builder().WithDeliverGroup("y").Build()) + .Build() + .DeliverGroup); + + Assert.Equal("x", PushSubscribeOptions.Builder() + .WithDeliverGroup("x") + .WithConfiguration(ConsumerConfiguration.Builder().WithDeliverGroup(null).Build()) + .Build() + .DeliverGroup); + + Assert.Equal("x", PushSubscribeOptions.Builder() + .WithDeliverGroup("x") + .WithConfiguration(ConsumerConfiguration.Builder().WithDeliverGroup("x").Build()) + .Build() + .DeliverGroup); + + Assert.Throws(() => PushSubscribeOptions.Builder() + .WithDeliverGroup("x") + .WithConfiguration(ConsumerConfiguration.Builder().WithDeliverGroup("y").Build()) + .Build()); + + } } } diff --git a/src/Tests/UnitTests/TestBase.cs b/src/Tests/UnitTests/TestBase.cs index 39d51f6a4..b1e4aa8f0 100644 --- a/src/Tests/UnitTests/TestBase.cs +++ b/src/Tests/UnitTests/TestBase.cs @@ -1,6 +1,7 @@ using System; using System.IO; using System.Text; +using System.Threading; namespace UnitTests { @@ -104,4 +105,26 @@ public static byte[] DataBytes(int seq) { return Encoding.ASCII.GetBytes(Data(seq)); } } + + public class InterlockedLong + { + private long count = 0; + + public InterlockedLong() {} + + public InterlockedLong(long count) + { + this.count = count; + } + + public void Inc() + { + Interlocked.Increment(ref count); + } + + public long Get() + { + return Interlocked.Read(ref count); + } + } } \ No newline at end of file From 35498c8ce4fe7fec968a804a5f347bf39b7e4b00 Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 30 Aug 2021 09:00:47 -0400 Subject: [PATCH 2/4] doesn't like if you have an unused var --- src/Tests/IntegrationTests/TestJetStreamPushAsync.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs b/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs index 7ecfb3c02..ce4c5a2bf 100644 --- a/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs +++ b/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs @@ -302,7 +302,7 @@ public void Run() msg = sub.NextMessage(500); } } - catch (NATSTimeoutException e) + catch (NATSTimeoutException) { // timeout is acceptable, means no messages available. } From f7a7803d48b0c70ebe139c5f6717a703cc8622c5 Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 30 Aug 2021 11:32:50 -0400 Subject: [PATCH 3/4] TestHandlerAutoAck must unsub --- .../IntegrationTests/TestJetStreamPushAsync.cs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs b/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs index ce4c5a2bf..2ceda2127 100644 --- a/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs +++ b/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs @@ -101,16 +101,18 @@ void Handler1(object sender, MsgHandlerEventArgs args) // subscribe using the handler, auto ack true PushSubscribeOptions pso1 = PushSubscribeOptions.Builder() .WithDurable(Durable(1)).Build(); - js.PushSubscribeAsync(SUBJECT, Handler1, true, pso1); + IJetStreamPushAsyncSubscription asub = js.PushSubscribeAsync(SUBJECT, Handler1, true, pso1); // wait for messages to arrive using the countdown latch. latch1.Wait(); Assert.Equal(10, handlerReceived1); + + asub.Unsubscribe(); // check that all the messages were read by the durable - IJetStreamPushSyncSubscription sub = js.PushSubscribeSync(SUBJECT, pso1); - AssertNoMoreMessages(sub); + IJetStreamPushSyncSubscription ssub = js.PushSubscribeSync(SUBJECT, pso1); + AssertNoMoreMessages(ssub); // 2. auto ack false CountdownEvent latch2 = new CountdownEvent(10); @@ -127,7 +129,7 @@ void Handler2(object sender, MsgHandlerEventArgs args) ConsumerConfiguration cc = ConsumerConfiguration.Builder().WithAckWait(500).Build(); PushSubscribeOptions pso2 = PushSubscribeOptions.Builder() .WithDurable(Durable(2)).WithConfiguration(cc).Build(); - js.PushSubscribeAsync(SUBJECT, Handler2, false, pso2); + asub = js.PushSubscribeAsync(SUBJECT, Handler2, false, pso2); // wait for messages to arrive using the countdown latch. latch2.Wait(); @@ -135,9 +137,11 @@ void Handler2(object sender, MsgHandlerEventArgs args) Thread.Sleep(2000); // just give it time for the server to realize the messages are not ack'ed + asub.Unsubscribe(); + // check that we get all the messages again - sub = js.PushSubscribeSync(SUBJECT, pso2); - Assert.Equal(10, ReadMessagesAck(sub).Count); + ssub = js.PushSubscribeSync(SUBJECT, pso2); + Assert.Equal(10, ReadMessagesAck(ssub).Count); }); } From d12ea3218d9a27d0733f1610d3666b83c94b0667 Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 30 Aug 2021 12:07:08 -0400 Subject: [PATCH 4/4] addressed comments --- src/NATS.Client/Internals/JsonUtils.cs | 2 +- src/NATS.Client/Internals/SimpleJson.cs | 2 +- src/NATS.Client/Internals/Validator.cs | 6 +----- src/NATS.Client/JetStream/ApiResponse.cs | 3 +-- src/NATS.Client/JetStream/JetStream.cs | 2 +- src/NATS.Client/JetStream/JetStreamManagement.cs | 2 +- src/NATS.Client/JetStream/JsPrefixManager.cs | 2 +- src/NATS.Client/JetStream/ServerInfo.cs | 3 +-- 8 files changed, 8 insertions(+), 14 deletions(-) diff --git a/src/NATS.Client/Internals/JsonUtils.cs b/src/NATS.Client/Internals/JsonUtils.cs index 3313cda2a..2ac581040 100644 --- a/src/NATS.Client/Internals/JsonUtils.cs +++ b/src/NATS.Client/Internals/JsonUtils.cs @@ -50,7 +50,7 @@ internal static List OptionalStringList(JSONNode node, String field) } internal static byte[] AsByteArrayFromBase64(JSONNode node) { - return Validator.NullOrEmpty(node.Value) ? null : Convert.FromBase64String(node.Value); + return string.IsNullOrWhiteSpace(node.Value) ? null : Convert.FromBase64String(node.Value); } internal static DateTime AsDate(JSONNode node) diff --git a/src/NATS.Client/Internals/SimpleJson.cs b/src/NATS.Client/Internals/SimpleJson.cs index 88f633653..7826f335a 100644 --- a/src/NATS.Client/Internals/SimpleJson.cs +++ b/src/NATS.Client/Internals/SimpleJson.cs @@ -1036,7 +1036,7 @@ internal partial class JSONString : JSONNode public override JSONNodeType Tag { get { return JSONNodeType.String; } } public override bool IsString { get { return true; } } - public override bool ShouldWrite { get { return !Validator.NullOrEmpty(m_Data); } } + public override bool ShouldWrite { get { return !string.IsNullOrWhiteSpace(m_Data); } } public override Enumerator GetEnumerator() { return new Enumerator(); } diff --git a/src/NATS.Client/Internals/Validator.cs b/src/NATS.Client/Internals/Validator.cs index 3663ad535..433173af7 100644 --- a/src/NATS.Client/Internals/Validator.cs +++ b/src/NATS.Client/Internals/Validator.cs @@ -298,11 +298,7 @@ private static bool NotPrintableOrHasWildGtDollar(String s) { internal static string EmptyAsNull(string s) { - return NullOrEmpty(s) ? null : s; - } - - internal static bool NullOrEmpty(String s) { - return s == null || s.Trim().Length == 0; + return string.IsNullOrWhiteSpace(s) ? null : s; } internal static bool ZeroOrLtMinus1(long l) diff --git a/src/NATS.Client/JetStream/ApiResponse.cs b/src/NATS.Client/JetStream/ApiResponse.cs index 94a09511e..b20ce53b5 100644 --- a/src/NATS.Client/JetStream/ApiResponse.cs +++ b/src/NATS.Client/JetStream/ApiResponse.cs @@ -12,7 +12,6 @@ // limitations under the License. using System.Text; -using NATS.Client.Internals; using NATS.Client.Internals.SimpleJSON; namespace NATS.Client.JetStream @@ -35,7 +34,7 @@ internal ApiResponse(string json, bool throwOnError = false) { JsonNode = JSON.Parse(json); Type = JsonNode[ApiConstants.Type].Value; - if (Validator.NullOrEmpty(Type)) + if (string.IsNullOrWhiteSpace(Type)) { Type = NoType; } diff --git a/src/NATS.Client/JetStream/JetStream.cs b/src/NATS.Client/JetStream/JetStream.cs index 8f51cdb88..ba24cb674 100644 --- a/src/NATS.Client/JetStream/JetStream.cs +++ b/src/NATS.Client/JetStream/JetStream.cs @@ -214,7 +214,7 @@ Subscription CreateSubscription(string subject, string queueName, else if (queueName == null) { throw new ArgumentException($"Existing consumer [{durable}] is configured as a queue / deliver group."); } - else if (!lookedUp.Equals(queueName)) { + else if (lookedUp != queueName) { throw new ArgumentException( $"Existing consumer deliver group {lookedUp} does not match requested queue / deliver group {queueName}."); } diff --git a/src/NATS.Client/JetStream/JetStreamManagement.cs b/src/NATS.Client/JetStream/JetStreamManagement.cs index 5d58dc3bf..f71f99b08 100644 --- a/src/NATS.Client/JetStream/JetStreamManagement.cs +++ b/src/NATS.Client/JetStream/JetStreamManagement.cs @@ -37,7 +37,7 @@ private StreamInfo AddOrUpdateStream(StreamConfiguration config, string addUpdat { Validator.ValidateNotNull(config, nameof(config)); - if (Validator.NullOrEmpty(config.Name)) { + if (string.IsNullOrWhiteSpace(config.Name)) { throw new ArgumentException("Configuration must have a valid stream name"); } diff --git a/src/NATS.Client/JetStream/JsPrefixManager.cs b/src/NATS.Client/JetStream/JsPrefixManager.cs index 81f26e0f2..23b83dbf6 100644 --- a/src/NATS.Client/JetStream/JsPrefixManager.cs +++ b/src/NATS.Client/JetStream/JsPrefixManager.cs @@ -22,7 +22,7 @@ internal static class JsPrefixManager private static ConcurrentDictionary JsPrefixes = new ConcurrentDictionary(); internal static string AddPrefix(string prefix) { - if (Validator.NullOrEmpty(prefix) || prefix.Equals(JetStreamConstants.JsapiPrefix)) { + if (string.IsNullOrWhiteSpace(prefix) || prefix.Equals(JetStreamConstants.JsapiPrefix)) { return JetStreamConstants.JsapiPrefix; } diff --git a/src/NATS.Client/JetStream/ServerInfo.cs b/src/NATS.Client/JetStream/ServerInfo.cs index d05032a5e..2150b9154 100644 --- a/src/NATS.Client/JetStream/ServerInfo.cs +++ b/src/NATS.Client/JetStream/ServerInfo.cs @@ -13,7 +13,6 @@ using System.Linq; using System.Text; -using NATS.Client.Internals; using NATS.Client.Internals.SimpleJSON; namespace NATS.Client.JetStream @@ -62,7 +61,7 @@ public ServerInfo(string json) ClientIp = siNode[ApiConstants.ClientIp].Value; Cluster = siNode[ApiConstants.Cluster].Value; ConnectURLs = siNode[ApiConstants.ConnectUrls].Children - .Where(n => !Validator.NullOrEmpty(n.Value)) + .Where(n => !string.IsNullOrWhiteSpace(n.Value)) .Select(n => n.Value) .ToArray(); }