diff --git a/src/NATS.Client/Internals/Validator.cs b/src/NATS.Client/Internals/Validator.cs index 79db2c3f..1817277b 100644 --- a/src/NATS.Client/Internals/Validator.cs +++ b/src/NATS.Client/Internals/Validator.cs @@ -158,7 +158,16 @@ public static string ValidatePrefixOrDomain(string s, string label, bool require }); } - public static string ValidateKvKeyWildcardAllowedRequired(string s) { + public static IList ValidateKvKeysWildcardAllowedRequired(IList keys) { + Required(keys, "Key"); + foreach (string key in keys) { + ValidateWildcardKvKey(key, "Key", true); + } + return keys; + } + + public static string ValidateKvKeyWildcardAllowedRequired(string s) + { return ValidateWildcardKvKey(s, "Key", true); } diff --git a/src/NATS.Client/KeyValue/IKeyValue.cs b/src/NATS.Client/KeyValue/IKeyValue.cs index 36e7838a..eb94a7b8 100644 --- a/src/NATS.Client/KeyValue/IKeyValue.cs +++ b/src/NATS.Client/KeyValue/IKeyValue.cs @@ -108,7 +108,7 @@ public interface IKeyValue void Purge(string key, ulong expectedRevision); /// - /// Watch updates for a specific key + /// Watch updates for a specific key. /// /// the key /// the watcher @@ -117,7 +117,7 @@ public interface IKeyValue KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions); /// - /// Watch updates for a specific key, starting from a specific revision + /// Watch updates for a specific key, starting from a specific revision. /// /// the key /// the watcher @@ -126,6 +126,25 @@ public interface IKeyValue /// KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions); + /// + /// Watch updates for a specific keys. + /// + /// the keys + /// the watcher + /// the watch options to apply. If multiple conflicting options are supplied, the last options wins. + /// + KeyValueWatchSubscription Watch(IList keys, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions); + + /// + /// Watch updates for a specific keys, starting from a specific revision. + /// + /// the keys + /// the watcher + /// the revision to start from + /// the watch options to apply. If multiple conflicting options are supplied, the last options wins. + /// + KeyValueWatchSubscription Watch(IList keys, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions); + /// /// Watch updates for all keys /// diff --git a/src/NATS.Client/KeyValue/KeyValue.cs b/src/NATS.Client/KeyValue/KeyValue.cs index 0a99269e..2ce83704 100644 --- a/src/NATS.Client/KeyValue/KeyValue.cs +++ b/src/NATS.Client/KeyValue/KeyValue.cs @@ -12,6 +12,7 @@ // limitations under the License. using System; +using System.Collections; using System.Collections.Generic; using System.Text; using NATS.Client.Internals; @@ -177,26 +178,40 @@ public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, par { Validator.ValidateKvKeyWildcardAllowedRequired(key); Validator.ValidateNotNull(watcher, "Watcher is required"); - return new KeyValueWatchSubscription(this, key, watcher, ConsumerConfiguration.UlongUnset, watchOptions); + return new KeyValueWatchSubscription(this, new List {key}, watcher, ConsumerConfiguration.UlongUnset, watchOptions); } public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) { Validator.ValidateKvKeyWildcardAllowedRequired(key); Validator.ValidateNotNull(watcher, "Watcher is required"); - return new KeyValueWatchSubscription(this, key, watcher, fromRevision, watchOptions); + return new KeyValueWatchSubscription(this, new List {key}, watcher, fromRevision, watchOptions); + } + + public KeyValueWatchSubscription Watch(IList keys, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions) + { + Validator.ValidateKvKeysWildcardAllowedRequired(keys); + Validator.ValidateNotNull(watcher, "Watcher is required"); + return new KeyValueWatchSubscription(this, keys, watcher, ConsumerConfiguration.UlongUnset, watchOptions); + } + + public KeyValueWatchSubscription Watch(IList keys, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) + { + Validator.ValidateKvKeysWildcardAllowedRequired(keys); + Validator.ValidateNotNull(watcher, "Watcher is required"); + return new KeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions); } public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions) { Validator.ValidateNotNull(watcher, "Watcher is required"); - return new KeyValueWatchSubscription(this, ">", watcher, ConsumerConfiguration.UlongUnset, watchOptions); + return new KeyValueWatchSubscription(this, new List {">"}, watcher, ConsumerConfiguration.UlongUnset, watchOptions); } public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) { Validator.ValidateNotNull(watcher, "Watcher is required"); - return new KeyValueWatchSubscription(this, ">", watcher, fromRevision, watchOptions); + return new KeyValueWatchSubscription(this, new List {">"}, watcher, fromRevision, watchOptions); } private PublishAck _write(string key, byte[] data, MsgHeader h) { diff --git a/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs b/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs index 2ca87edd..0cadc0e8 100644 --- a/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs +++ b/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs @@ -12,6 +12,7 @@ // limitations under the License. using System; +using System.Collections.Generic; using NATS.Client.Internals; using NATS.Client.JetStream; @@ -23,11 +24,15 @@ public class KeyValueWatchSubscription : IDisposable private readonly InterlockedBoolean endOfDataSent; private readonly object subLock; - public KeyValueWatchSubscription(KeyValue kv, string keyPattern, + public KeyValueWatchSubscription(KeyValue kv, IList keyPatterns, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) { subLock = new object(); - string subscribeSubject = kv.ReadSubject(keyPattern); + IList subscribeSubjects = new List(); + foreach (string keyPattern in keyPatterns) + { + subscribeSubjects.Add(kv.ReadSubject(keyPattern)); + } // figure out the result options bool headersOnly = false; @@ -52,7 +57,7 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern, else { fromRevision = ConsumerConfiguration.UlongUnset; // easier on the builder since we aren't starting at a fromRevision - if (deliverPolicy == DeliverPolicy.New || kv._getLast(subscribeSubject) == null) + if (deliverPolicy == DeliverPolicy.New) { endOfDataSent = new InterlockedBoolean(true); watcher.EndOfData(); @@ -72,7 +77,7 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern, .WithDeliverPolicy(deliverPolicy) .WithStartSequence(fromRevision) .WithHeadersOnly(headersOnly) - .WithFilterSubject(subscribeSubject) + .WithFilterSubjects(subscribeSubjects) .Build()) .Build(); @@ -91,7 +96,7 @@ void Handler(object sender, MsgHandlerEventArgs args) } } - sub = kv.js.PushSubscribeAsync(subscribeSubject, Handler, false, pso); + sub = kv.js.PushSubscribeAsync(null, Handler, false, pso); if (endOfDataSent.IsFalse()) { ulong pending = sub.GetConsumerInformation().CalculatedPending; diff --git a/src/Tests/IntegrationTests/TestKeyValue.cs b/src/Tests/IntegrationTests/TestKeyValue.cs index f074e909..a7cfa3fa 100644 --- a/src/Tests/IntegrationTests/TestKeyValue.cs +++ b/src/Tests/IntegrationTests/TestKeyValue.cs @@ -860,6 +860,8 @@ public void TestWatch() object[] allPutsExpecteds = { "a", "aa", "z", "zz", "aaa", "zzz", null }; + + IList allKeys = new List {key1, key2, keyNull}; Context.RunInJsServer(c => { @@ -899,6 +901,8 @@ public void TestWatch() TestKeyValueWatcher starMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly); TestKeyValueWatcher gtFullWatcher = new TestKeyValueWatcher(true); TestKeyValueWatcher gtMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly); + TestKeyValueWatcher multipleFullWatcher = new TestKeyValueWatcher(true); + TestKeyValueWatcher multipleMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly); IList subs = new List(); subs.Add(kv.Watch(key1, key1FullWatcher, key1FullWatcher.WatchOptions)); @@ -916,6 +920,12 @@ public void TestWatch() subs.Add(kv.Watch("key.>", gtFullWatcher, gtFullWatcher.WatchOptions)); subs.Add(kv.Watch("key.>", gtMetaWatcher, gtMetaWatcher.WatchOptions)); + if (AtLeast2_10(c.ServerInfo)) + { + subs.Add(kv.Watch(allKeys, multipleFullWatcher, multipleFullWatcher.WatchOptions)); + subs.Add(kv.Watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.WatchOptions)); + } + kv.Put(key1, "a"); kv.Put(key1, "aa"); kv.Put(key2, "z");