Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KV Watch Multiple Filters #887

Merged
merged 3 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/NATS.Client/Internals/Validator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,16 @@ public static string ValidatePrefixOrDomain(string s, string label, bool require
});
}

public static string ValidateKvKeyWildcardAllowedRequired(string s) {
public static IList<String> ValidateKvKeysWildcardAllowedRequired(IList<String> keys) {
Required(keys, "Key");
foreach (string key in keys) {
ValidateWildcardKvKey(key, "Key", true);
}
return keys;
}

public static string ValidateKvKeyWildcardAllowedRequired(string s)
{
return ValidateWildcardKvKey(s, "Key", true);
}

Expand Down
23 changes: 21 additions & 2 deletions src/NATS.Client/KeyValue/IKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public interface IKeyValue
void Purge(string key, ulong expectedRevision);

/// <summary>
/// Watch updates for a specific key
/// Watch updates for a specific key.
/// </summary>
/// <param name="key">the key</param>
/// <param name="watcher">the watcher</param>
Expand All @@ -117,7 +117,7 @@ public interface IKeyValue
KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for a specific key, starting from a specific revision
/// Watch updates for a specific key, starting from a specific revision.
/// </summary>
/// <param name="key">the key</param>
/// <param name="watcher">the watcher</param>
Expand All @@ -126,6 +126,25 @@ public interface IKeyValue
/// <returns></returns>
KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for a specific keys.
/// </summary>
/// <param name="keys">the keys</param>
/// <param name="watcher">the watcher</param>
/// <param name="watchOptions">the watch options to apply. If multiple conflicting options are supplied, the last options wins.</param>
/// <returns></returns>
KeyValueWatchSubscription Watch(IList<string> keys, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for a specific keys, starting from a specific revision.
/// </summary>
/// <param name="keys">the keys</param>
/// <param name="watcher">the watcher</param>
/// <param name="fromRevision">the revision to start from</param>
/// <param name="watchOptions">the watch options to apply. If multiple conflicting options are supplied, the last options wins.</param>
/// <returns></returns>
KeyValueWatchSubscription Watch(IList<string> keys, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for all keys
/// </summary>
Expand Down
23 changes: 19 additions & 4 deletions src/NATS.Client/KeyValue/KeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using NATS.Client.Internals;
Expand Down Expand Up @@ -177,26 +178,40 @@ public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, par
{
Validator.ValidateKvKeyWildcardAllowedRequired(key);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, key, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
return new KeyValueWatchSubscription(this, new List<string> {key}, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
}

public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateKvKeyWildcardAllowedRequired(key);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, key, watcher, fromRevision, watchOptions);
return new KeyValueWatchSubscription(this, new List<string> {key}, watcher, fromRevision, watchOptions);
}

public KeyValueWatchSubscription Watch(IList<string> keys, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateKvKeysWildcardAllowedRequired(keys);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, keys, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
}

public KeyValueWatchSubscription Watch(IList<string> keys, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateKvKeysWildcardAllowedRequired(keys);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions);
}

public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, ">", watcher, ConsumerConfiguration.UlongUnset, watchOptions);
return new KeyValueWatchSubscription(this, new List<string> {">"}, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
}

public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, ">", watcher, fromRevision, watchOptions);
return new KeyValueWatchSubscription(this, new List<string> {">"}, watcher, fromRevision, watchOptions);
}

private PublishAck _write(string key, byte[] data, MsgHeader h) {
Expand Down
15 changes: 10 additions & 5 deletions src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

using System;
using System.Collections.Generic;
using NATS.Client.Internals;
using NATS.Client.JetStream;

Expand All @@ -23,11 +24,15 @@ public class KeyValueWatchSubscription : IDisposable
private readonly InterlockedBoolean endOfDataSent;
private readonly object subLock;

public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
public KeyValueWatchSubscription(KeyValue kv, IList<string> keyPatterns,
IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
subLock = new object();
string subscribeSubject = kv.ReadSubject(keyPattern);
IList<string> subscribeSubjects = new List<string>();
foreach (string keyPattern in keyPatterns)
{
subscribeSubjects.Add(kv.ReadSubject(keyPattern));
}

// figure out the result options
bool headersOnly = false;
Expand All @@ -52,7 +57,7 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
else
{
fromRevision = ConsumerConfiguration.UlongUnset; // easier on the builder since we aren't starting at a fromRevision
if (deliverPolicy == DeliverPolicy.New || kv._getLast(subscribeSubject) == null)
mtmk marked this conversation as resolved.
Show resolved Hide resolved
if (deliverPolicy == DeliverPolicy.New)
{
endOfDataSent = new InterlockedBoolean(true);
watcher.EndOfData();
Expand All @@ -72,7 +77,7 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
.WithDeliverPolicy(deliverPolicy)
.WithStartSequence(fromRevision)
.WithHeadersOnly(headersOnly)
.WithFilterSubject(subscribeSubject)
.WithFilterSubjects(subscribeSubjects)
.Build())
.Build();

Expand All @@ -91,7 +96,7 @@ void Handler(object sender, MsgHandlerEventArgs args)
}
}

sub = kv.js.PushSubscribeAsync(subscribeSubject, Handler, false, pso);
sub = kv.js.PushSubscribeAsync(null, Handler, false, pso);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter subject is used when subscribe subject is null

if (endOfDataSent.IsFalse())
{
ulong pending = sub.GetConsumerInformation().CalculatedPending;
Expand Down
10 changes: 10 additions & 0 deletions src/Tests/IntegrationTests/TestKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,8 @@ public void TestWatch()
object[] allPutsExpecteds = {
"a", "aa", "z", "zz", "aaa", "zzz", null
};

IList<string> allKeys = new List<string> {key1, key2, keyNull};

Context.RunInJsServer(c =>
{
Expand Down Expand Up @@ -899,6 +901,8 @@ public void TestWatch()
TestKeyValueWatcher starMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly);
TestKeyValueWatcher gtFullWatcher = new TestKeyValueWatcher(true);
TestKeyValueWatcher gtMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly);
TestKeyValueWatcher multipleFullWatcher = new TestKeyValueWatcher(true);
TestKeyValueWatcher multipleMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly);

IList<KeyValueWatchSubscription> subs = new List<KeyValueWatchSubscription>();
subs.Add(kv.Watch(key1, key1FullWatcher, key1FullWatcher.WatchOptions));
Expand All @@ -916,6 +920,12 @@ public void TestWatch()
subs.Add(kv.Watch("key.>", gtFullWatcher, gtFullWatcher.WatchOptions));
subs.Add(kv.Watch("key.>", gtMetaWatcher, gtMetaWatcher.WatchOptions));

if (AtLeast2_10(c.ServerInfo))
{
subs.Add(kv.Watch(allKeys, multipleFullWatcher, multipleFullWatcher.WatchOptions));
subs.Add(kv.Watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.WatchOptions));
}

kv.Put(key1, "a");
kv.Put(key1, "aa");
kv.Put(key2, "z");
Expand Down
Loading