From e00c21885d1cc51516d587b6a45e72bfda45a416 Mon Sep 17 00:00:00 2001 From: Michael Landis Date: Mon, 23 Sep 2024 14:45:37 -0700 Subject: [PATCH] feat: add an topic event enumerable (#575) Introduces an interface ITopicEvent which covers both normal messages (TopicMessage) and system events (TopicSystemEvent). We refactor the internals of the stream to be over ITopicEvents, so that we can filter to just TopicMessage (as is implemented currently) or pass through all events in an event enumerable. To enumerate over all events in the stream, call GetAllEventsAsyncEnumerable, or WithCancellationForAllEvents. This largely parallels the existing enumerator over messages only. See the integration tests for an example usage. --- src/Momento.Sdk/Internal/ScsTopicClient.cs | 13 +- .../Responses/Topic/ITopicEvent.cs | 12 ++ .../Responses/Topic/TopicMessage.cs | 15 +- .../Responses/Topic/TopicSubscribeResponse.cs | 128 ++++++++++++++++-- .../Responses/Topic/TopicSystemEvent.cs | 56 ++++++++ .../Momento.Sdk.Tests/Topics/TopicTest.cs | 88 +++++++++++- 6 files changed, 294 insertions(+), 18 deletions(-) create mode 100644 src/Momento.Sdk/Responses/Topic/ITopicEvent.cs create mode 100644 src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs diff --git a/src/Momento.Sdk/Internal/ScsTopicClient.cs b/src/Momento.Sdk/Internal/ScsTopicClient.cs index 41754939..7517c108 100644 --- a/src/Momento.Sdk/Internal/ScsTopicClient.cs +++ b/src/Momento.Sdk/Internal/ScsTopicClient.cs @@ -142,7 +142,7 @@ private async Task SendSubscribe(string cacheName, strin } var response = new TopicSubscribeResponse.Subscription( - cancellationToken => subscriptionWrapper.GetNextRelevantMessageFromGrpcStreamAsync(cancellationToken), + cancellationToken => subscriptionWrapper.GetNextEventFromGrpcStreamAsync(cancellationToken), subscriptionWrapper.Dispose); return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicSubscribe, cacheName, topicName, response); @@ -198,7 +198,7 @@ public async Task Subscribe() _subscribed = true; } - public async ValueTask GetNextRelevantMessageFromGrpcStreamAsync( + public async ValueTask GetNextEventFromGrpcStreamAsync( CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) @@ -251,10 +251,10 @@ public async Task Subscribe() { case _TopicValue.KindOneofCase.Text: _logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName); - return new TopicMessage.Text(message.Item.Value, checked((long)message.Item.TopicSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId); + return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId); case _TopicValue.KindOneofCase.Binary: _logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName); - return new TopicMessage.Binary(message.Item.Value, checked((long)message.Item.TopicSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId); + return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId); case _TopicValue.KindOneofCase.None: default: _logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName); @@ -266,10 +266,11 @@ public async Task Subscribe() _logger.LogTraceTopicDiscontinuityReceived(_cacheName, _topicName, message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence); _lastSequenceNumber = message.Discontinuity.NewTopicSequence; - break; + return new TopicSystemEvent.Discontinuity(checked((long)message.Discontinuity.LastTopicSequence), + checked((long)message.Discontinuity.NewTopicSequence)); case _SubscriptionItem.KindOneofCase.Heartbeat: _logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName); - break; + return new TopicSystemEvent.Heartbeat(); case _SubscriptionItem.KindOneofCase.None: _logger.LogTraceTopicMessageReceived("none", _cacheName, _topicName); break; diff --git a/src/Momento.Sdk/Responses/Topic/ITopicEvent.cs b/src/Momento.Sdk/Responses/Topic/ITopicEvent.cs new file mode 100644 index 00000000..c1f8ebd2 --- /dev/null +++ b/src/Momento.Sdk/Responses/Topic/ITopicEvent.cs @@ -0,0 +1,12 @@ +namespace Momento.Sdk.Responses; + +/// +/// Represents an event that can be published to a topic. +/// +/// This includes not just topic mesasges, but also system events +/// such as discontinuities and heartbeats. +/// +public interface ITopicEvent +{ + +} diff --git a/src/Momento.Sdk/Responses/Topic/TopicMessage.cs b/src/Momento.Sdk/Responses/Topic/TopicMessage.cs index 1e8714ab..0b132cb3 100644 --- a/src/Momento.Sdk/Responses/Topic/TopicMessage.cs +++ b/src/Momento.Sdk/Responses/Topic/TopicMessage.cs @@ -1,5 +1,6 @@ using Momento.Protos.CacheClient.Pubsub; using Momento.Sdk.Exceptions; +using Momento.Sdk.Internal.ExtensionMethods; namespace Momento.Sdk.Responses; @@ -31,7 +32,7 @@ namespace Momento.Sdk.Responses; /// } /// /// -public abstract class TopicMessage +public abstract class TopicMessage : ITopicEvent { /// /// A topic message containing a text value. @@ -63,6 +64,12 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = /// This can be used to securely identify the sender of a message. /// public string? TokenId { get; } + + /// + public override string ToString() + { + return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\""; + } } /// @@ -95,6 +102,12 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId /// This can be used to securely identify the sender of a message. /// public string? TokenId { get; } + + /// + public override string ToString() + { + return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\""; + } } /// diff --git a/src/Momento.Sdk/Responses/Topic/TopicSubscribeResponse.cs b/src/Momento.Sdk/Responses/Topic/TopicSubscribeResponse.cs index 2371defc..8b1258e5 100644 --- a/src/Momento.Sdk/Responses/Topic/TopicSubscribeResponse.cs +++ b/src/Momento.Sdk/Responses/Topic/TopicSubscribeResponse.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Net; using System.Threading; using System.Threading.Tasks; using Momento.Sdk.Exceptions; @@ -43,28 +44,60 @@ public abstract class TopicSubscribeResponse /// public class Subscription : TopicSubscribeResponse, IDisposable, IAsyncEnumerable { - private readonly Func> _moveNextFunction; + private readonly Func> _moveNextFunction; private CancellationTokenSource _subscriptionCancellationToken = new(); private readonly Action _disposalAction; /// /// Constructs a Subscription with a wrapped topic iterator and an action to dispose of it. /// - public Subscription(Func> moveNextFunction, Action disposalAction) + public Subscription(Func> moveNextFunction, Action disposalAction) { _moveNextFunction = moveNextFunction; _disposalAction = disposalAction; } /// - /// Gets the enumerator for this topic. This subscription represents a single view on a topic, so multiple + /// Gets the message enumerator for this topic. Includes text and binary messages, but excludes system events. + /// + public IAsyncEnumerable WithCancellation(CancellationToken cancellationToken) + { + return new AsyncEnumerableWrapper(GetAsyncEnumerator(cancellationToken)); + } + + /// + /// Gets the event enumerator with cancellation for all topic events, including system events. + /// + public IAsyncEnumerable WithCancellationForAllEvents(CancellationToken cancellationToken) + { + return new AsyncEnumerableWrapper(GetAllEventsAsyncEnumerator(cancellationToken)); + } + + /// + /// Gets the message enumerator for this topic. Includes text and binary messages, but excludes system events. + /// + /// This subscription represents a single view on a topic, so multiple /// enumerators will interfere with each other. /// + /// A cancellation token to cancel the enumeration. + /// An async enumerator for the topic messages. public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { return new TopicMessageEnumerator(_moveNextFunction, _subscriptionCancellationToken.Token, cancellationToken); } + /// + /// Gets an enumerator for all events on this topic, including text and binary messages, and system events. + /// + /// This subscription represents a single view on a topic, so multiple + /// enumerators will interfere with each other. + /// + /// A cancellation token to cancel the enumeration. + /// An async enumerator for all events on the topic. + IAsyncEnumerator GetAllEventsAsyncEnumerator(CancellationToken cancellationToken) + { + return new AllTopicEventsEnumerator(_moveNextFunction, _subscriptionCancellationToken.Token, cancellationToken); + } /// /// Unsubscribe from this topic. @@ -78,12 +111,12 @@ public void Dispose() private class TopicMessageEnumerator : IAsyncEnumerator { - private readonly Func> _moveNextFunction; + private readonly Func> _moveNextFunction; private readonly CancellationToken _subscriptionCancellationToken; private readonly CancellationToken _enumeratorCancellationToken; public TopicMessageEnumerator( - Func> moveNextFunction, + Func> moveNextFunction, CancellationToken subscriptionCancellationToken, CancellationToken enumeratorCancellationToken) { @@ -94,6 +127,63 @@ public TopicMessageEnumerator( public TopicMessage? Current { get; private set; } + public async ValueTask MoveNextAsync() + { + // We iterate over the stream until we get a TopicMessage, an error, or the stream is closed. + // We skip over system events like heartbeats and discontinuities. + while (true) + { + if (_subscriptionCancellationToken.IsCancellationRequested || _enumeratorCancellationToken.IsCancellationRequested) + { + Current = null; + return false; + } + + var nextEvent = await _moveNextFunction.Invoke(_enumeratorCancellationToken); + switch (nextEvent) + { + case TopicMessage.Text: + case TopicMessage.Binary: + Current = (TopicMessage)nextEvent; + return true; + case TopicMessage.Error: + Current = (TopicMessage)nextEvent; + return false; + // This enumerator excludes system events from the stream + case TopicSystemEvent.Discontinuity: + case TopicSystemEvent.Heartbeat: + continue; + default: + Current = null; + return false; + } + } + } + + public ValueTask DisposeAsync() + { + return new ValueTask(); + } + } + + private class AllTopicEventsEnumerator : IAsyncEnumerator + { + private readonly Func> _moveNextFunction; + private readonly CancellationToken _subscriptionCancellationToken; + private readonly CancellationToken _enumeratorCancellationToken; + + public AllTopicEventsEnumerator( + Func> moveNextFunction, + CancellationToken subscriptionCancellationToken, + CancellationToken enumeratorCancellationToken) + { + _moveNextFunction = moveNextFunction; + _subscriptionCancellationToken = subscriptionCancellationToken; + _enumeratorCancellationToken = enumeratorCancellationToken; + } + + public ITopicEvent? Current { get; private set; } + public async ValueTask MoveNextAsync() { if (_subscriptionCancellationToken.IsCancellationRequested || _enumeratorCancellationToken.IsCancellationRequested) @@ -102,15 +192,18 @@ public async ValueTask MoveNextAsync() return false; } - var nextMessage = await _moveNextFunction.Invoke(_enumeratorCancellationToken); - switch (nextMessage) + var nextEvent = await _moveNextFunction.Invoke(_enumeratorCancellationToken); + + switch (nextEvent) { case TopicMessage.Text: case TopicMessage.Binary: - Current = nextMessage; + case TopicSystemEvent.Discontinuity: + case TopicSystemEvent.Heartbeat: + Current = nextEvent; return true; case TopicMessage.Error: - Current = nextMessage; + Current = nextEvent; return false; default: Current = null; @@ -124,6 +217,23 @@ public ValueTask DisposeAsync() } } + // Helper class to wrap async enumerators into async enumerable + // This is necessary to support multiple enumerators on the same subscription. + private class AsyncEnumerableWrapper : IAsyncEnumerable + { + private readonly IAsyncEnumerator _asyncEnumerator; + + public AsyncEnumerableWrapper(IAsyncEnumerator asyncEnumerator) + { + _asyncEnumerator = asyncEnumerator; + } + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + return _asyncEnumerator; + } + } + /// public class Error : TopicSubscribeResponse, IError { diff --git a/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs b/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs new file mode 100644 index 00000000..e281382d --- /dev/null +++ b/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs @@ -0,0 +1,56 @@ +namespace Momento.Sdk.Responses; + +/// +/// Represents a system event that can be published to a topic. +/// +public abstract class TopicSystemEvent : ITopicEvent +{ + /// + /// Represents a heartbeat event. + /// + public class Heartbeat : TopicSystemEvent + { + /// + /// Constructs a new heartbeat event. + /// + public Heartbeat() + { + + } + + /// + public override string ToString() => base.ToString() ?? "Heartbeat"; + } + + /// + /// Represents a discontinuity event. + /// + public class Discontinuity : TopicSystemEvent + { + /// + /// Constructs a new discontinuity event. + /// + /// The last known sequence number before the discontinuity. + /// The sequence number of the discontinuity. + public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber) + { + LastKnownSequenceNumber = lastKnownSequenceNumber; + SequenceNumber = sequenceNumber; + } + + /// + /// The last known sequence number before the discontinuity. + /// + public long LastKnownSequenceNumber { get; } + /// + /// The sequence number of the discontinuity. + /// + public long SequenceNumber { get; } + + /// + public override string ToString() + { + return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber}"; + } + } +} diff --git a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs index fe1bc0cf..5f50868f 100644 --- a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs +++ b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs @@ -117,7 +117,7 @@ public async Task PublishAndSubscribe_ByteArray_Succeeds() [Fact(Timeout = 5000)] public async Task PublishAndSubscribe_String_Succeeds() { - const string topicName = "topic_string"; + var topicName = Utils.NewGuidString(); var valuesToSend = new List { "one", @@ -143,10 +143,66 @@ public async Task PublishAndSubscribe_String_Succeeds() Assert.Equal(valuesToSend.Count, consumedMessages.Count); for (var i = 0; i < valuesToSend.Count; ++i) { - Assert.Equal(((TopicMessage.Text)consumedMessages[i]).Value, valuesToSend[i]); + var textMessage = (TopicMessage.Text)consumedMessages[i]; + Assert.Equal(textMessage.Value, valuesToSend[i]); + Assert.Equal(textMessage.TopicSequenceNumber, i + 1); } } + [Fact(Timeout = 15000)] + public async Task PublishAndSubscribe_AllEventsString_Succeeds() + { + var topicName = Utils.NewGuidString(); + var valuesToSend = new List + { + "one", + "two", + "three", + "four", + "five" + }; + + var produceCancellation = new CancellationTokenSource(); + + // we don't need to put this on a different thread + var consumeTask = ConsumeAllEvents(topicName, produceCancellation.Token); + await Task.Delay(500); + + await ProduceMessages(topicName, valuesToSend); + await Task.Delay(10000); + + produceCancellation.Cancel(); + + var consumedEvents = await consumeTask; + var messageCount = 0; + var heartbeatCount = 0; + var discontinuityCount = 0; + foreach (var topicEvent in consumedEvents) + { + switch (topicEvent) + { + case TopicMessage.Text textMessage: + Assert.Equal(textMessage.Value, valuesToSend[messageCount]); + Assert.Equal(textMessage.TopicSequenceNumber, messageCount + 1); + messageCount++; + break; + case TopicSystemEvent.Heartbeat: + heartbeatCount++; + break; + case TopicSystemEvent.Discontinuity: + discontinuityCount++; + break; + default: + throw new Exception("bad message received"); + } + } + + Assert.Equal(valuesToSend.Count, messageCount); + Assert.True(heartbeatCount > 0); + Assert.Equal(0, discontinuityCount); + } + + private async Task ProduceMessages(string topicName, List valuesToSend) { foreach (var value in valuesToSend) @@ -207,6 +263,34 @@ private async Task> ConsumeMessages(string topicName, Cancell } } + private async Task> ConsumeAllEvents(string topicName, CancellationToken token) + { + var subscribeResponse = await topicClient.SubscribeAsync(cacheName, topicName); + switch (subscribeResponse) + { + case TopicSubscribeResponse.Subscription subscription: + var cancellableSubscription = subscription.WithCancellationForAllEvents(token); + var receivedSet = new List(); + await foreach (var topicEvent in cancellableSubscription) + { + switch (topicEvent) + { + case TopicMessage.Binary: + case TopicMessage.Text: + case TopicSystemEvent: + receivedSet.Add(topicEvent); + break; + default: + throw new Exception("bad message received"); + } + } + subscription.Dispose(); + return receivedSet; + default: + throw new Exception("subscription error"); + } + } + [Fact] public async Task MultipleSubscriptions_HappyPath() {