Skip to content

Commit

Permalink
KV Watch From Revision (#866)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Feb 19, 2024
1 parent bef085d commit 51a6e79
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 28 deletions.
19 changes: 19 additions & 0 deletions src/NATS.Client/KeyValue/IKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public interface IKeyValue
/// <returns></returns>
KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for a specific key, starting from a specific revision
/// </summary>
/// <param name="key">the key</param>
/// <param name="watcher">the watcher</param>
/// <param name="fromRevision">the revision to start from</param>
/// <param name="watchOptions">the watch options to apply. If multiple conflicting options are supplied, the last options wins.</param>
/// <returns></returns>
KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for all keys
/// </summary>
Expand All @@ -110,6 +120,15 @@ public interface IKeyValue
/// <returns>The KeyValueWatchSubscription</returns>
KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for all keys, starting from a specific revision
/// </summary>
/// <param name="watcher">the watcher</param>
/// <param name="fromRevision">the revision to start from</param>
/// <param name="watchOptions">the watch options to apply. If multiple conflicting options are supplied, the last options wins.</param>
/// <returns>The KeyValueWatchSubscription</returns>
KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Get a list of the keys in a bucket.
/// </summary>
Expand Down
17 changes: 15 additions & 2 deletions src/NATS.Client/KeyValue/KeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,26 @@ public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, par
{
Validator.ValidateKvKeyWildcardAllowedRequired(key);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, key, watcher, watchOptions);
return new KeyValueWatchSubscription(this, key, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
}

public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateKvKeyWildcardAllowedRequired(key);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, key, watcher, fromRevision, watchOptions);
}

public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, ">", watcher, watchOptions);
return new KeyValueWatchSubscription(this, ">", watcher, ConsumerConfiguration.UlongUnset, watchOptions);
}

public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, ">", watcher, fromRevision, watchOptions);
}

private PublishAck _write(string key, byte[] data, MsgHeader h) {
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/KeyValue/KeyValueWatchOption.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public enum KeyValueWatchOption
IgnoreDelete,

/// <summary>
/// Only get meta data, skip value when retrieving data from the server.
/// Only get metadata, skip value when retrieving data from the server.
/// </summary>
MetaOnly,

Expand Down
26 changes: 18 additions & 8 deletions src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class KeyValueWatchSubscription : IDisposable
private readonly object subLock;

public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions)
IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
subLock = new object();
string subscribeSubject = kv.ReadSubject(keyPattern);
Expand All @@ -44,14 +44,23 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
}
}

if (deliverPolicy == DeliverPolicy.New || kv._getLast(subscribeSubject) == null)
if (fromRevision > 0)
{
endOfDataSent = new InterlockedBoolean(true);
watcher.EndOfData();
deliverPolicy = DeliverPolicy.ByStartSequence;
endOfDataSent = new InterlockedBoolean();
}
else
{
endOfDataSent = new InterlockedBoolean(false);
fromRevision = ConsumerConfiguration.UlongUnset; // easier on the builder since we aren't starting at a fromRevision
if (deliverPolicy == DeliverPolicy.New || kv._getLast(subscribeSubject) == null)
{
endOfDataSent = new InterlockedBoolean(true);
watcher.EndOfData();
}
else
{
endOfDataSent = new InterlockedBoolean();
}
}

PushSubscribeOptions pso = PushSubscribeOptions.Builder()
Expand All @@ -61,12 +70,13 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
ConsumerConfiguration.Builder()
.WithAckPolicy(AckPolicy.None)
.WithDeliverPolicy(deliverPolicy)
.WithStartSequence(fromRevision)
.WithHeadersOnly(headersOnly)
.WithFilterSubject(subscribeSubject)
.Build())
.Build();

EventHandler<MsgHandlerEventArgs> handler = (sender, args) =>
void Handler(object sender, MsgHandlerEventArgs args)
{
KeyValueEntry kve = new KeyValueEntry(args.Message);
if (includeDeletes || kve.Operation.Equals(KeyValueOperation.Put))
Expand All @@ -79,9 +89,9 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
endOfDataSent.Set(true);
watcher.EndOfData();
}
};
}

sub = kv.js.PushSubscribeAsync(subscribeSubject, handler, false, pso);
sub = kv.js.PushSubscribeAsync(subscribeSubject, Handler, false, pso);
if (endOfDataSent.IsFalse())
{
ulong pending = sub.GetConsumerInformation().CalculatedPending;
Expand Down
71 changes: 54 additions & 17 deletions src/Tests/IntegrationTests/TestKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -738,28 +738,38 @@ public void TestWatch()
string key1 = "key.1";
string key2 = "key.2";

Object[] key1AllExpecteds = new Object[] {
object[] key1AllExpecteds = {
"a", "aa", KeyValueOperation.Delete, "aaa", KeyValueOperation.Delete, KeyValueOperation.Purge
};

object[] key1FromRevisionExpecteds = {
"aa", KeyValueOperation.Delete, "aaa"
};

Object[] noExpecteds = new Object[0];
Object[] purgeOnlyExpecteds = { KeyValueOperation.Purge };
object[] noExpecteds = Array.Empty<object>();
object[] purgeOnlyExpecteds = { KeyValueOperation.Purge };

Object[] key2AllExpecteds = {
object[] key2AllExpecteds = {
"z", "zz", KeyValueOperation.Delete, "zzz"
};

Object[] key2AfterExpecteds = { "zzz" };
object[] key2AfterExpecteds = { "zzz" };

Object[] allExpecteds = new object[] {
object[] allExpecteds = {
"a", "aa", "z", "zz",
KeyValueOperation.Delete, KeyValueOperation.Delete,
"aaa", "zzz",
KeyValueOperation.Delete, KeyValueOperation.Purge,
null
};

Object[] allPutsExpecteds = {
object[] allFromRevisionExpecteds = {
"aa", "z", "zz",
KeyValueOperation.Delete, KeyValueOperation.Delete,
"aaa", "zzz"
};

object[] allPutsExpecteds = {
"a", "aa", "z", "zz", "aaa", "zzz", null
};

Expand All @@ -768,14 +778,23 @@ public void TestWatch()
// get the kv management context
IKeyValueManagement kvm = c.CreateKeyValueManagementContext();
// create the bucket
string bucket1 = Bucket(1);
string bucket2 = Bucket(2);
// create the buckets
kvm.Create(KeyValueConfiguration.Builder()
.WithName(BUCKET)
.WithName(bucket1)
.WithMaxHistoryPerKey(10)
.WithStorageType(StorageType.Memory)
.Build());
IKeyValue kv = c.CreateKeyValueContext(BUCKET);
kvm.Create(KeyValueConfiguration.Builder()
.WithName(bucket2)
.WithMaxHistoryPerKey(10)
.WithStorageType(StorageType.Memory)
.Build());
IKeyValue kv = c.CreateKeyValueContext(bucket1);
IKeyValue kv2 = c.CreateKeyValueContext(bucket2);
TestKeyValueWatcher key1FullWatcher = new TestKeyValueWatcher(true);
TestKeyValueWatcher key1MetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly);
Expand Down Expand Up @@ -819,7 +838,16 @@ public void TestWatch()
kv.Delete(key1);
kv.Purge(key1);
kv.Put(keyNull, (byte[])null);
kv2.Put(key1, "a");
kv2.Put(key1, "aa");
kv2.Put(key2, "z");
kv2.Put(key2, "zz");
kv2.Delete(key1);
kv2.Delete(key2);
kv2.Put(key1, "aaa");
kv2.Put(key2, "zzz");
Thread.Sleep(100); // give time for all the data to be setup
TestKeyValueWatcher key1AfterWatcher = new TestKeyValueWatcher(false, KeyValueWatchOption.MetaOnly);
Expand All @@ -830,6 +858,9 @@ public void TestWatch()
TestKeyValueWatcher key2AfterStartNewWatcher = new TestKeyValueWatcher(false, KeyValueWatchOption.MetaOnly, KeyValueWatchOption.UpdatesOnly);
TestKeyValueWatcher key2AfterStartFirstWatcher = new TestKeyValueWatcher(false, KeyValueWatchOption.MetaOnly, KeyValueWatchOption.IncludeHistory);
TestKeyValueWatcher key1FromRevisionAfterWatcher = new TestKeyValueWatcher(false);
TestKeyValueWatcher allFromRevisionAfterWatcher = new TestKeyValueWatcher(false);
subs.Add(kv.Watch(key1, key1AfterWatcher, key1AfterWatcher.WatchOptions));
subs.Add(kv.Watch(key1, key1AfterIgDelWatcher, key1AfterIgDelWatcher.WatchOptions));
subs.Add(kv.Watch(key1, key1AfterStartNewWatcher, key1AfterStartNewWatcher.WatchOptions));
Expand All @@ -838,6 +869,9 @@ public void TestWatch()
subs.Add(kv.Watch(key2, key2AfterStartNewWatcher, key2AfterStartNewWatcher.WatchOptions));
subs.Add(kv.Watch(key2, key2AfterStartFirstWatcher, key2AfterStartFirstWatcher.WatchOptions));
subs.Add(kv2.Watch(key1, key1FromRevisionAfterWatcher, 2, key1FromRevisionAfterWatcher.WatchOptions));
subs.Add(kv2.WatchAll(allFromRevisionAfterWatcher, 2, allFromRevisionAfterWatcher.WatchOptions));
Thread.Sleep(2000); // give time for the watches to get messages
// unsubscribe so the watchers don't get any more messages
Expand Down Expand Up @@ -876,6 +910,9 @@ public void TestWatch()
ValidateWatcher(key2AfterExpecteds, key2AfterWatcher);
ValidateWatcher(noExpecteds, key2AfterStartNewWatcher);
ValidateWatcher(key2AllExpecteds, key2AfterStartFirstWatcher);
ValidateWatcher(key1FromRevisionExpecteds, key1FromRevisionAfterWatcher);
ValidateWatcher(allFromRevisionExpecteds, allFromRevisionAfterWatcher);
});
}

Expand All @@ -899,7 +936,7 @@ private void ValidateWatcher(object[] expectedKves, TestKeyValueWatcher watcher)
Assert.True(lastRevision < kve.Revision);
lastRevision = kve.Revision;

Object expected = expectedKves[aix++];
object expected = expectedKves[aix++];
if (expected == null) {
Assert.Equal(KeyValueOperation.Put, kve.Operation);
Assert.True(kve.Value == null || kve.Value.Length == 0);
Expand Down Expand Up @@ -1024,7 +1061,7 @@ public void TestWithAccount()
AssertKvAccountKeys(kv_connA_bucketI.Keys(), Key(21), Key(22));
AssertKvAccountKeys(kv_connI_bucketI.Keys(), Key(21), Key(22));

Object[] expecteds = {
object[] expecteds = {
Data(0), Data(1), KeyValueOperation.Delete, KeyValueOperation.Purge, Data(2),
Data(0), Data(1), KeyValueOperation.Delete, KeyValueOperation.Purge, Data(2)
};
Expand Down Expand Up @@ -1074,7 +1111,7 @@ private void AssertKveAccount(IKeyValue kvWorker, string key, IKeyValue kvUserA,
AssertKveAccountGet(kvUserA, kvUserI, key, Data(2));
}

private void AssertKveAccountHistory(IList<KeyValueEntry> history, params Object[] expecteds) {
private void AssertKveAccountHistory(IList<KeyValueEntry> history, params object[] expecteds) {
Assert.Equal(expecteds.Length, history.Count);
for (int x = 0; x < expecteds.Length; x++) {
if (expecteds[x] is string expected) {
Expand Down Expand Up @@ -1243,7 +1280,7 @@ private void _testMirror(IKeyValue okv, IKeyValue mkv, int num) {
{
Thread.Sleep(200); // give the messages time to propagate
}
ValidateWatcher(new Object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, mWatcher);
ValidateWatcher(new object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, mWatcher);

// Does the origin data match?
if (okv != null) {
Expand All @@ -1252,7 +1289,7 @@ private void _testMirror(IKeyValue okv, IKeyValue mkv, int num) {
{
Thread.Sleep(200); // give the messages time to propagate
}
ValidateWatcher(new Object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, oWatcher);
ValidateWatcher(new object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, oWatcher);
}
}

Expand Down Expand Up @@ -1299,7 +1336,7 @@ public void TestDontGetNoResponders()
}
}

class TestKeyValueWatcher : IKeyValueWatcher
class TestKeyValueWatcher : IKeyValueWatcher
{
public IList<KeyValueEntry> Entries = new List<KeyValueEntry>();
public KeyValueWatchOption[] WatchOptions;
Expand Down

0 comments on commit 51a6e79

Please sign in to comment.