Skip to content

Commit

Permalink
feat: add an topic event enumerable (#575)
Browse files Browse the repository at this point in the history
Introduces an interface ITopicEvent which covers both normal
messages (TopicMessage) and system events (TopicSystemEvent). We
refactor the internals of the stream to be over ITopicEvents, so
that we can filter to just TopicMessage (as is implemented
currently) or pass through all events in an event enumerable.

To enumerate over all events in the stream, call GetAllEventsAsyncEnumerable,
or WithCancellationForAllEvents.

This largely parallels the existing enumerator over messages only. See the
integration tests for an example usage.
  • Loading branch information
malandis authored Sep 23, 2024
1 parent 6fc4140 commit e00c218
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 18 deletions.
13 changes: 7 additions & 6 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, strin
}

var response = new TopicSubscribeResponse.Subscription(
cancellationToken => subscriptionWrapper.GetNextRelevantMessageFromGrpcStreamAsync(cancellationToken),
cancellationToken => subscriptionWrapper.GetNextEventFromGrpcStreamAsync(cancellationToken),
subscriptionWrapper.Dispose);
return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicSubscribe, cacheName, topicName,
response);
Expand Down Expand Up @@ -198,7 +198,7 @@ public async Task Subscribe()
_subscribed = true;
}

public async ValueTask<TopicMessage?> GetNextRelevantMessageFromGrpcStreamAsync(
public async ValueTask<ITopicEvent?> GetNextEventFromGrpcStreamAsync(
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -251,10 +251,10 @@ public async Task Subscribe()
{
case _TopicValue.KindOneofCase.Text:
_logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName);
return new TopicMessage.Text(message.Item.Value, checked((long)message.Item.TopicSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.Binary:
_logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName);
return new TopicMessage.Binary(message.Item.Value, checked((long)message.Item.TopicSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.None:
default:
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
Expand All @@ -266,10 +266,11 @@ public async Task Subscribe()
_logger.LogTraceTopicDiscontinuityReceived(_cacheName, _topicName,
message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence);
_lastSequenceNumber = message.Discontinuity.NewTopicSequence;
break;
return new TopicSystemEvent.Discontinuity(checked((long)message.Discontinuity.LastTopicSequence),
checked((long)message.Discontinuity.NewTopicSequence));
case _SubscriptionItem.KindOneofCase.Heartbeat:
_logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName);
break;
return new TopicSystemEvent.Heartbeat();
case _SubscriptionItem.KindOneofCase.None:
_logger.LogTraceTopicMessageReceived("none", _cacheName, _topicName);
break;
Expand Down
12 changes: 12 additions & 0 deletions src/Momento.Sdk/Responses/Topic/ITopicEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Momento.Sdk.Responses;

/// <summary>
/// Represents an event that can be published to a topic.
///
/// This includes not just topic mesasges, but also system events
/// such as discontinuities and heartbeats.
/// </summary>
public interface ITopicEvent
{

}
15 changes: 14 additions & 1 deletion src/Momento.Sdk/Responses/Topic/TopicMessage.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Momento.Protos.CacheClient.Pubsub;
using Momento.Sdk.Exceptions;
using Momento.Sdk.Internal.ExtensionMethods;

namespace Momento.Sdk.Responses;

Expand Down Expand Up @@ -31,7 +32,7 @@ namespace Momento.Sdk.Responses;
/// }
/// </code>
/// </summary>
public abstract class TopicMessage
public abstract class TopicMessage : ITopicEvent
{
/// <summary>
/// A topic message containing a text value.
Expand Down Expand Up @@ -63,6 +64,12 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId =
/// This can be used to securely identify the sender of a message.
/// </summary>
public string? TokenId { get; }

/// <inheritdoc />
public override string ToString()
{
return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\"";
}
}

/// <summary>
Expand Down Expand Up @@ -95,6 +102,12 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId
/// This can be used to securely identify the sender of a message.
/// </summary>
public string? TokenId { get; }

/// <inheritdoc />
public override string ToString()
{
return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\"";
}
}

/// <include file="../../docs.xml" path='docs/class[@name="Error"]/description/*' />
Expand Down
128 changes: 119 additions & 9 deletions src/Momento.Sdk/Responses/Topic/TopicSubscribeResponse.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Momento.Sdk.Exceptions;
Expand Down Expand Up @@ -43,28 +44,60 @@ public abstract class TopicSubscribeResponse
/// </summary>
public class Subscription : TopicSubscribeResponse, IDisposable, IAsyncEnumerable<TopicMessage?>
{
private readonly Func<CancellationToken, ValueTask<TopicMessage?>> _moveNextFunction;
private readonly Func<CancellationToken, ValueTask<ITopicEvent?>> _moveNextFunction;
private CancellationTokenSource _subscriptionCancellationToken = new();
private readonly Action _disposalAction;

/// <summary>
/// Constructs a Subscription with a wrapped topic iterator and an action to dispose of it.
/// </summary>
public Subscription(Func<CancellationToken, ValueTask<TopicMessage?>> moveNextFunction, Action disposalAction)
public Subscription(Func<CancellationToken, ValueTask<ITopicEvent?>> moveNextFunction, Action disposalAction)
{
_moveNextFunction = moveNextFunction;
_disposalAction = disposalAction;
}

/// <summary>
/// Gets the enumerator for this topic. This subscription represents a single view on a topic, so multiple
/// Gets the message enumerator for this topic. Includes text and binary messages, but excludes system events.
/// </summary>
public IAsyncEnumerable<TopicMessage?> WithCancellation(CancellationToken cancellationToken)
{
return new AsyncEnumerableWrapper<TopicMessage?>(GetAsyncEnumerator(cancellationToken));
}

/// <summary>
/// Gets the event enumerator with cancellation for all topic events, including system events.
/// </summary>
public IAsyncEnumerable<ITopicEvent?> WithCancellationForAllEvents(CancellationToken cancellationToken)
{
return new AsyncEnumerableWrapper<ITopicEvent?>(GetAllEventsAsyncEnumerator(cancellationToken));
}

/// <summary>
/// Gets the message enumerator for this topic. Includes text and binary messages, but excludes system events.
///
/// This subscription represents a single view on a topic, so multiple
/// enumerators will interfere with each other.
/// </summary>
/// <param name="cancellationToken">A cancellation token to cancel the enumeration.</param>
/// <returns>An async enumerator for the topic messages.</returns>
public IAsyncEnumerator<TopicMessage?> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new TopicMessageEnumerator(_moveNextFunction, _subscriptionCancellationToken.Token, cancellationToken);
}

/// <summary>
/// Gets an enumerator for all events on this topic, including text and binary messages, and system events.
///
/// This subscription represents a single view on a topic, so multiple
/// enumerators will interfere with each other.
/// </summary>
/// <param name="cancellationToken">A cancellation token to cancel the enumeration.</param>
/// <returns>An async enumerator for all events on the topic.</returns>
IAsyncEnumerator<ITopicEvent?> GetAllEventsAsyncEnumerator(CancellationToken cancellationToken)
{
return new AllTopicEventsEnumerator(_moveNextFunction, _subscriptionCancellationToken.Token, cancellationToken);
}

/// <summary>
/// Unsubscribe from this topic.
Expand All @@ -78,12 +111,12 @@ public void Dispose()

private class TopicMessageEnumerator : IAsyncEnumerator<TopicMessage?>
{
private readonly Func<CancellationToken, ValueTask<TopicMessage?>> _moveNextFunction;
private readonly Func<CancellationToken, ValueTask<ITopicEvent?>> _moveNextFunction;
private readonly CancellationToken _subscriptionCancellationToken;
private readonly CancellationToken _enumeratorCancellationToken;

public TopicMessageEnumerator(
Func<CancellationToken, ValueTask<TopicMessage?>> moveNextFunction,
Func<CancellationToken, ValueTask<ITopicEvent?>> moveNextFunction,
CancellationToken subscriptionCancellationToken,
CancellationToken enumeratorCancellationToken)
{
Expand All @@ -94,6 +127,63 @@ public TopicMessageEnumerator(

public TopicMessage? Current { get; private set; }

public async ValueTask<bool> MoveNextAsync()
{
// We iterate over the stream until we get a TopicMessage, an error, or the stream is closed.
// We skip over system events like heartbeats and discontinuities.
while (true)
{
if (_subscriptionCancellationToken.IsCancellationRequested || _enumeratorCancellationToken.IsCancellationRequested)
{
Current = null;
return false;
}

var nextEvent = await _moveNextFunction.Invoke(_enumeratorCancellationToken);
switch (nextEvent)
{
case TopicMessage.Text:
case TopicMessage.Binary:
Current = (TopicMessage)nextEvent;
return true;
case TopicMessage.Error:
Current = (TopicMessage)nextEvent;
return false;
// This enumerator excludes system events from the stream
case TopicSystemEvent.Discontinuity:
case TopicSystemEvent.Heartbeat:
continue;
default:
Current = null;
return false;
}
}
}

public ValueTask DisposeAsync()
{
return new ValueTask();
}
}

private class AllTopicEventsEnumerator : IAsyncEnumerator<ITopicEvent?>
{
private readonly Func<CancellationToken, ValueTask<ITopicEvent?>> _moveNextFunction;
private readonly CancellationToken _subscriptionCancellationToken;
private readonly CancellationToken _enumeratorCancellationToken;

public AllTopicEventsEnumerator(
Func<CancellationToken, ValueTask<ITopicEvent?>> moveNextFunction,
CancellationToken subscriptionCancellationToken,
CancellationToken enumeratorCancellationToken)
{
_moveNextFunction = moveNextFunction;
_subscriptionCancellationToken = subscriptionCancellationToken;
_enumeratorCancellationToken = enumeratorCancellationToken;
}

public ITopicEvent? Current { get; private set; }

public async ValueTask<bool> MoveNextAsync()
{
if (_subscriptionCancellationToken.IsCancellationRequested || _enumeratorCancellationToken.IsCancellationRequested)
Expand All @@ -102,15 +192,18 @@ public async ValueTask<bool> MoveNextAsync()
return false;
}

var nextMessage = await _moveNextFunction.Invoke(_enumeratorCancellationToken);
switch (nextMessage)
var nextEvent = await _moveNextFunction.Invoke(_enumeratorCancellationToken);

switch (nextEvent)
{
case TopicMessage.Text:
case TopicMessage.Binary:
Current = nextMessage;
case TopicSystemEvent.Discontinuity:
case TopicSystemEvent.Heartbeat:
Current = nextEvent;
return true;
case TopicMessage.Error:
Current = nextMessage;
Current = nextEvent;
return false;
default:
Current = null;
Expand All @@ -124,6 +217,23 @@ public ValueTask DisposeAsync()
}
}

// Helper class to wrap async enumerators into async enumerable
// This is necessary to support multiple enumerators on the same subscription.
private class AsyncEnumerableWrapper<T> : IAsyncEnumerable<T>
{
private readonly IAsyncEnumerator<T> _asyncEnumerator;

public AsyncEnumerableWrapper(IAsyncEnumerator<T> asyncEnumerator)
{
_asyncEnumerator = asyncEnumerator;
}

public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return _asyncEnumerator;
}
}

/// <include file="../../docs.xml" path='docs/class[@name="Error"]/description/*' />
public class Error : TopicSubscribeResponse, IError
{
Expand Down
56 changes: 56 additions & 0 deletions src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
namespace Momento.Sdk.Responses;

/// <summary>
/// Represents a system event that can be published to a topic.
/// </summary>
public abstract class TopicSystemEvent : ITopicEvent
{
/// <summary>
/// Represents a heartbeat event.
/// </summary>
public class Heartbeat : TopicSystemEvent
{
/// <summary>
/// Constructs a new heartbeat event.
/// </summary>
public Heartbeat()
{

}

/// <inheritdoc/>
public override string ToString() => base.ToString() ?? "Heartbeat";
}

/// <summary>
/// Represents a discontinuity event.
/// </summary>
public class Discontinuity : TopicSystemEvent
{
/// <summary>
/// Constructs a new discontinuity event.
/// </summary>
/// <param name="lastKnownSequenceNumber">The last known sequence number before the discontinuity.</param>
/// <param name="sequenceNumber">The sequence number of the discontinuity.</param>
public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber)
{
LastKnownSequenceNumber = lastKnownSequenceNumber;
SequenceNumber = sequenceNumber;
}

/// <summary>
/// The last known sequence number before the discontinuity.
/// </summary>
public long LastKnownSequenceNumber { get; }
/// <summary>
/// The sequence number of the discontinuity.
/// </summary>
public long SequenceNumber { get; }

/// <inheritdoc/>
public override string ToString()
{
return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber}";
}
}
}
Loading

0 comments on commit e00c218

Please sign in to comment.