From 9ad54302c8a442a54eb8fac5e65138dbcca3e18a Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 10 Apr 2024 13:41:26 -0400 Subject: [PATCH 1/3] KV Watch Multiple Filters --- src/NATS.Client/Internals/Validator.cs | 11 +++++++- src/NATS.Client/KeyValue/IKeyValue.cs | 27 ++++++++++++++++--- src/NATS.Client/KeyValue/KeyValue.cs | 26 +++++++++++------- .../KeyValue/KeyValueWatchSubscription.cs | 15 +++++++---- src/Tests/IntegrationTests/TestKeyValue.cs | 10 +++++++ 5 files changed, 70 insertions(+), 19 deletions(-) diff --git a/src/NATS.Client/Internals/Validator.cs b/src/NATS.Client/Internals/Validator.cs index 79db2c3ff..1817277b8 100644 --- a/src/NATS.Client/Internals/Validator.cs +++ b/src/NATS.Client/Internals/Validator.cs @@ -158,7 +158,16 @@ public static string ValidatePrefixOrDomain(string s, string label, bool require }); } - public static string ValidateKvKeyWildcardAllowedRequired(string s) { + public static IList ValidateKvKeysWildcardAllowedRequired(IList keys) { + Required(keys, "Key"); + foreach (string key in keys) { + ValidateWildcardKvKey(key, "Key", true); + } + return keys; + } + + public static string ValidateKvKeyWildcardAllowedRequired(string s) + { return ValidateWildcardKvKey(s, "Key", true); } diff --git a/src/NATS.Client/KeyValue/IKeyValue.cs b/src/NATS.Client/KeyValue/IKeyValue.cs index 36e7838ad..4ca04cfaf 100644 --- a/src/NATS.Client/KeyValue/IKeyValue.cs +++ b/src/NATS.Client/KeyValue/IKeyValue.cs @@ -108,24 +108,43 @@ public interface IKeyValue void Purge(string key, ulong expectedRevision); /// - /// Watch updates for a specific key + /// Watch updates for a specific key or keys. /// - /// the key + /// the key or a comma delimited list of keys /// the watcher /// the watch options to apply. If multiple conflicting options are supplied, the last options wins. /// KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions); /// - /// Watch updates for a specific key, starting from a specific revision + /// Watch updates for a specific key or keys, starting from a specific revision. /// - /// the key + /// the key or a comma delimited list of keys /// the watcher /// the revision to start from /// the watch options to apply. If multiple conflicting options are supplied, the last options wins. /// KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions); + /// + /// Watch updates for a specific keys. + /// + /// the keys + /// the watcher + /// the watch options to apply. If multiple conflicting options are supplied, the last options wins. + /// + KeyValueWatchSubscription Watch(IList keys, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions); + + /// + /// Watch updates for a specific keys, starting from a specific revision. + /// + /// the keys + /// the watcher + /// the revision to start from + /// the watch options to apply. If multiple conflicting options are supplied, the last options wins. + /// + KeyValueWatchSubscription Watch(IList keys, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions); + /// /// Watch updates for all keys /// diff --git a/src/NATS.Client/KeyValue/KeyValue.cs b/src/NATS.Client/KeyValue/KeyValue.cs index 0a99269e5..b72a0da3e 100644 --- a/src/NATS.Client/KeyValue/KeyValue.cs +++ b/src/NATS.Client/KeyValue/KeyValue.cs @@ -12,6 +12,7 @@ // limitations under the License. using System; +using System.Collections; using System.Collections.Generic; using System.Text; using NATS.Client.Internals; @@ -175,28 +176,35 @@ public void Purge(string key, ulong expectedRevision) public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions) { - Validator.ValidateKvKeyWildcardAllowedRequired(key); - Validator.ValidateNotNull(watcher, "Watcher is required"); - return new KeyValueWatchSubscription(this, key, watcher, ConsumerConfiguration.UlongUnset, watchOptions); + return Watch(new List(key.Split(',')), watcher, ConsumerConfiguration.UlongUnset, watchOptions); } public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) { - Validator.ValidateKvKeyWildcardAllowedRequired(key); + return Watch(new List(key.Split(',')), watcher, fromRevision, watchOptions); + } + + public KeyValueWatchSubscription Watch(IList keys, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions) + { + return Watch(keys, watcher, ConsumerConfiguration.UlongUnset, watchOptions); + } + + public KeyValueWatchSubscription Watch(IList keys, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) + { + // all watch methods (Watch, WatchAll) delegate to here + Validator.ValidateKvKeysWildcardAllowedRequired(keys); Validator.ValidateNotNull(watcher, "Watcher is required"); - return new KeyValueWatchSubscription(this, key, watcher, fromRevision, watchOptions); + return new KeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions); } public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions) { - Validator.ValidateNotNull(watcher, "Watcher is required"); - return new KeyValueWatchSubscription(this, ">", watcher, ConsumerConfiguration.UlongUnset, watchOptions); + return Watch(new List {">"}, watcher, ConsumerConfiguration.UlongUnset, watchOptions); } public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) { - Validator.ValidateNotNull(watcher, "Watcher is required"); - return new KeyValueWatchSubscription(this, ">", watcher, fromRevision, watchOptions); + return Watch(new List {">"}, watcher, fromRevision, watchOptions); } private PublishAck _write(string key, byte[] data, MsgHeader h) { diff --git a/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs b/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs index 2ca87edd2..0cadc0e84 100644 --- a/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs +++ b/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs @@ -12,6 +12,7 @@ // limitations under the License. using System; +using System.Collections.Generic; using NATS.Client.Internals; using NATS.Client.JetStream; @@ -23,11 +24,15 @@ public class KeyValueWatchSubscription : IDisposable private readonly InterlockedBoolean endOfDataSent; private readonly object subLock; - public KeyValueWatchSubscription(KeyValue kv, string keyPattern, + public KeyValueWatchSubscription(KeyValue kv, IList keyPatterns, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) { subLock = new object(); - string subscribeSubject = kv.ReadSubject(keyPattern); + IList subscribeSubjects = new List(); + foreach (string keyPattern in keyPatterns) + { + subscribeSubjects.Add(kv.ReadSubject(keyPattern)); + } // figure out the result options bool headersOnly = false; @@ -52,7 +57,7 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern, else { fromRevision = ConsumerConfiguration.UlongUnset; // easier on the builder since we aren't starting at a fromRevision - if (deliverPolicy == DeliverPolicy.New || kv._getLast(subscribeSubject) == null) + if (deliverPolicy == DeliverPolicy.New) { endOfDataSent = new InterlockedBoolean(true); watcher.EndOfData(); @@ -72,7 +77,7 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern, .WithDeliverPolicy(deliverPolicy) .WithStartSequence(fromRevision) .WithHeadersOnly(headersOnly) - .WithFilterSubject(subscribeSubject) + .WithFilterSubjects(subscribeSubjects) .Build()) .Build(); @@ -91,7 +96,7 @@ void Handler(object sender, MsgHandlerEventArgs args) } } - sub = kv.js.PushSubscribeAsync(subscribeSubject, Handler, false, pso); + sub = kv.js.PushSubscribeAsync(null, Handler, false, pso); if (endOfDataSent.IsFalse()) { ulong pending = sub.GetConsumerInformation().CalculatedPending; diff --git a/src/Tests/IntegrationTests/TestKeyValue.cs b/src/Tests/IntegrationTests/TestKeyValue.cs index f074e9092..720efba3e 100644 --- a/src/Tests/IntegrationTests/TestKeyValue.cs +++ b/src/Tests/IntegrationTests/TestKeyValue.cs @@ -860,6 +860,8 @@ public void TestWatch() object[] allPutsExpecteds = { "a", "aa", "z", "zz", "aaa", "zzz", null }; + + IList allKeys = new List {key1, key2, keyNull}; Context.RunInJsServer(c => { @@ -899,6 +901,10 @@ public void TestWatch() TestKeyValueWatcher starMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly); TestKeyValueWatcher gtFullWatcher = new TestKeyValueWatcher(true); TestKeyValueWatcher gtMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly); + TestKeyValueWatcher multipleFullWatcher = new TestKeyValueWatcher(true); + TestKeyValueWatcher multipleMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly); + TestKeyValueWatcher multipleFullWatcher2 = new TestKeyValueWatcher(true); + TestKeyValueWatcher multipleMetaWatcher2 = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly); IList subs = new List(); subs.Add(kv.Watch(key1, key1FullWatcher, key1FullWatcher.WatchOptions)); @@ -915,6 +921,10 @@ public void TestWatch() subs.Add(kv.Watch("key.*", starMetaWatcher, starMetaWatcher.WatchOptions)); subs.Add(kv.Watch("key.>", gtFullWatcher, gtFullWatcher.WatchOptions)); subs.Add(kv.Watch("key.>", gtMetaWatcher, gtMetaWatcher.WatchOptions)); + subs.Add(kv.Watch(allKeys, multipleFullWatcher, multipleFullWatcher.WatchOptions)); + subs.Add(kv.Watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.WatchOptions)); + subs.Add(kv.Watch(string.Join(",", allKeys), multipleFullWatcher2, multipleFullWatcher2.WatchOptions)); + subs.Add(kv.Watch(string.Join(",", allKeys), multipleMetaWatcher2, multipleMetaWatcher2.WatchOptions)); kv.Put(key1, "a"); kv.Put(key1, "aa"); From b76b10e86f97a2546e8d28d9a47d892f59058fd3 Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 11 Apr 2024 06:45:06 -0400 Subject: [PATCH 2/3] KV Watch Multiple Filters --- src/Tests/IntegrationTests/TestKeyValue.cs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/Tests/IntegrationTests/TestKeyValue.cs b/src/Tests/IntegrationTests/TestKeyValue.cs index 720efba3e..3c38eda97 100644 --- a/src/Tests/IntegrationTests/TestKeyValue.cs +++ b/src/Tests/IntegrationTests/TestKeyValue.cs @@ -921,10 +921,14 @@ public void TestWatch() subs.Add(kv.Watch("key.*", starMetaWatcher, starMetaWatcher.WatchOptions)); subs.Add(kv.Watch("key.>", gtFullWatcher, gtFullWatcher.WatchOptions)); subs.Add(kv.Watch("key.>", gtMetaWatcher, gtMetaWatcher.WatchOptions)); - subs.Add(kv.Watch(allKeys, multipleFullWatcher, multipleFullWatcher.WatchOptions)); - subs.Add(kv.Watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.WatchOptions)); - subs.Add(kv.Watch(string.Join(",", allKeys), multipleFullWatcher2, multipleFullWatcher2.WatchOptions)); - subs.Add(kv.Watch(string.Join(",", allKeys), multipleMetaWatcher2, multipleMetaWatcher2.WatchOptions)); + + if (AtLeast2_10(c.ServerInfo)) + { + subs.Add(kv.Watch(allKeys, multipleFullWatcher, multipleFullWatcher.WatchOptions)); + subs.Add(kv.Watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.WatchOptions)); + subs.Add(kv.Watch(string.Join(",", allKeys), multipleFullWatcher2, multipleFullWatcher2.WatchOptions)); + subs.Add(kv.Watch(string.Join(",", allKeys), multipleMetaWatcher2, multipleMetaWatcher2.WatchOptions)); + } kv.Put(key1, "a"); kv.Put(key1, "aa"); From adccf5a5d4ad1b894a8328db543644facff403c7 Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 11 Apr 2024 07:01:43 -0400 Subject: [PATCH 3/3] removing comma separated support --- src/NATS.Client/KeyValue/IKeyValue.cs | 8 ++++---- src/NATS.Client/KeyValue/KeyValue.cs | 19 +++++++++++++------ src/Tests/IntegrationTests/TestKeyValue.cs | 4 ---- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/NATS.Client/KeyValue/IKeyValue.cs b/src/NATS.Client/KeyValue/IKeyValue.cs index 4ca04cfaf..eb94a7b86 100644 --- a/src/NATS.Client/KeyValue/IKeyValue.cs +++ b/src/NATS.Client/KeyValue/IKeyValue.cs @@ -108,18 +108,18 @@ public interface IKeyValue void Purge(string key, ulong expectedRevision); /// - /// Watch updates for a specific key or keys. + /// Watch updates for a specific key. /// - /// the key or a comma delimited list of keys + /// the key /// the watcher /// the watch options to apply. If multiple conflicting options are supplied, the last options wins. /// KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions); /// - /// Watch updates for a specific key or keys, starting from a specific revision. + /// Watch updates for a specific key, starting from a specific revision. /// - /// the key or a comma delimited list of keys + /// the key /// the watcher /// the revision to start from /// the watch options to apply. If multiple conflicting options are supplied, the last options wins. diff --git a/src/NATS.Client/KeyValue/KeyValue.cs b/src/NATS.Client/KeyValue/KeyValue.cs index b72a0da3e..2ce837040 100644 --- a/src/NATS.Client/KeyValue/KeyValue.cs +++ b/src/NATS.Client/KeyValue/KeyValue.cs @@ -176,22 +176,27 @@ public void Purge(string key, ulong expectedRevision) public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions) { - return Watch(new List(key.Split(',')), watcher, ConsumerConfiguration.UlongUnset, watchOptions); + Validator.ValidateKvKeyWildcardAllowedRequired(key); + Validator.ValidateNotNull(watcher, "Watcher is required"); + return new KeyValueWatchSubscription(this, new List {key}, watcher, ConsumerConfiguration.UlongUnset, watchOptions); } public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) { - return Watch(new List(key.Split(',')), watcher, fromRevision, watchOptions); + Validator.ValidateKvKeyWildcardAllowedRequired(key); + Validator.ValidateNotNull(watcher, "Watcher is required"); + return new KeyValueWatchSubscription(this, new List {key}, watcher, fromRevision, watchOptions); } public KeyValueWatchSubscription Watch(IList keys, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions) { - return Watch(keys, watcher, ConsumerConfiguration.UlongUnset, watchOptions); + Validator.ValidateKvKeysWildcardAllowedRequired(keys); + Validator.ValidateNotNull(watcher, "Watcher is required"); + return new KeyValueWatchSubscription(this, keys, watcher, ConsumerConfiguration.UlongUnset, watchOptions); } public KeyValueWatchSubscription Watch(IList keys, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) { - // all watch methods (Watch, WatchAll) delegate to here Validator.ValidateKvKeysWildcardAllowedRequired(keys); Validator.ValidateNotNull(watcher, "Watcher is required"); return new KeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions); @@ -199,12 +204,14 @@ public KeyValueWatchSubscription Watch(IList keys, IKeyValueWatcher watc public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions) { - return Watch(new List {">"}, watcher, ConsumerConfiguration.UlongUnset, watchOptions); + Validator.ValidateNotNull(watcher, "Watcher is required"); + return new KeyValueWatchSubscription(this, new List {">"}, watcher, ConsumerConfiguration.UlongUnset, watchOptions); } public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) { - return Watch(new List {">"}, watcher, fromRevision, watchOptions); + Validator.ValidateNotNull(watcher, "Watcher is required"); + return new KeyValueWatchSubscription(this, new List {">"}, watcher, fromRevision, watchOptions); } private PublishAck _write(string key, byte[] data, MsgHeader h) { diff --git a/src/Tests/IntegrationTests/TestKeyValue.cs b/src/Tests/IntegrationTests/TestKeyValue.cs index 3c38eda97..a7cfa3fae 100644 --- a/src/Tests/IntegrationTests/TestKeyValue.cs +++ b/src/Tests/IntegrationTests/TestKeyValue.cs @@ -903,8 +903,6 @@ public void TestWatch() TestKeyValueWatcher gtMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly); TestKeyValueWatcher multipleFullWatcher = new TestKeyValueWatcher(true); TestKeyValueWatcher multipleMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly); - TestKeyValueWatcher multipleFullWatcher2 = new TestKeyValueWatcher(true); - TestKeyValueWatcher multipleMetaWatcher2 = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly); IList subs = new List(); subs.Add(kv.Watch(key1, key1FullWatcher, key1FullWatcher.WatchOptions)); @@ -926,8 +924,6 @@ public void TestWatch() { subs.Add(kv.Watch(allKeys, multipleFullWatcher, multipleFullWatcher.WatchOptions)); subs.Add(kv.Watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.WatchOptions)); - subs.Add(kv.Watch(string.Join(",", allKeys), multipleFullWatcher2, multipleFullWatcher2.WatchOptions)); - subs.Add(kv.Watch(string.Join(",", allKeys), multipleMetaWatcher2, multipleMetaWatcher2.WatchOptions)); } kv.Put(key1, "a");