Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add an topic event enumerable #575

Merged
merged 7 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, strin
}

var response = new TopicSubscribeResponse.Subscription(
cancellationToken => subscriptionWrapper.GetNextRelevantMessageFromGrpcStreamAsync(cancellationToken),
cancellationToken => subscriptionWrapper.GetNextEventFromGrpcStreamAsync(cancellationToken),
subscriptionWrapper.Dispose);
return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicSubscribe, cacheName, topicName,
response);
Expand Down Expand Up @@ -198,7 +198,7 @@ public async Task Subscribe()
_subscribed = true;
}

public async ValueTask<TopicMessage?> GetNextRelevantMessageFromGrpcStreamAsync(
public async ValueTask<ITopicEvent?> GetNextEventFromGrpcStreamAsync(
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -251,10 +251,10 @@ public async Task Subscribe()
{
case _TopicValue.KindOneofCase.Text:
_logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName);
return new TopicMessage.Text(message.Item.Value, checked((long)message.Item.TopicSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.Binary:
_logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName);
return new TopicMessage.Binary(message.Item.Value, checked((long)message.Item.TopicSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.None:
default:
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
Expand All @@ -266,10 +266,11 @@ public async Task Subscribe()
_logger.LogTraceTopicDiscontinuityReceived(_cacheName, _topicName,
message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence);
_lastSequenceNumber = message.Discontinuity.NewTopicSequence;
break;
return new TopicSystemEvent.Discontinuity(checked((long)message.Discontinuity.LastTopicSequence),
checked((long)message.Discontinuity.NewTopicSequence));
case _SubscriptionItem.KindOneofCase.Heartbeat:
_logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName);
break;
return new TopicSystemEvent.Heartbeat();
case _SubscriptionItem.KindOneofCase.None:
_logger.LogTraceTopicMessageReceived("none", _cacheName, _topicName);
break;
Expand Down
12 changes: 12 additions & 0 deletions src/Momento.Sdk/Responses/Topic/ITopicEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Momento.Sdk.Responses;

/// <summary>
/// Represents an event that can be published to a topic.
///
/// This includes not just topic mesasges, but also system events
/// such as discontinuities and heartbeats.
/// </summary>
public interface ITopicEvent
{

}
15 changes: 14 additions & 1 deletion src/Momento.Sdk/Responses/Topic/TopicMessage.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Momento.Protos.CacheClient.Pubsub;
using Momento.Sdk.Exceptions;
using Momento.Sdk.Internal.ExtensionMethods;

namespace Momento.Sdk.Responses;

Expand Down Expand Up @@ -31,7 +32,7 @@ namespace Momento.Sdk.Responses;
/// }
/// </code>
/// </summary>
public abstract class TopicMessage
public abstract class TopicMessage : ITopicEvent
{
/// <summary>
/// A topic message containing a text value.
Expand Down Expand Up @@ -63,6 +64,12 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId =
/// This can be used to securely identify the sender of a message.
/// </summary>
public string? TokenId { get; }

/// <inheritdoc />
public override string ToString()
{
return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\"";
}
}

/// <summary>
Expand Down Expand Up @@ -95,6 +102,12 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId
/// This can be used to securely identify the sender of a message.
/// </summary>
public string? TokenId { get; }

/// <inheritdoc />
public override string ToString()
{
return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\"";
}
}

/// <include file="../../docs.xml" path='docs/class[@name="Error"]/description/*' />
Expand Down
128 changes: 119 additions & 9 deletions src/Momento.Sdk/Responses/Topic/TopicSubscribeResponse.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Momento.Sdk.Exceptions;
Expand Down Expand Up @@ -43,28 +44,60 @@ public abstract class TopicSubscribeResponse
/// </summary>
public class Subscription : TopicSubscribeResponse, IDisposable, IAsyncEnumerable<TopicMessage?>
{
private readonly Func<CancellationToken, ValueTask<TopicMessage?>> _moveNextFunction;
private readonly Func<CancellationToken, ValueTask<ITopicEvent?>> _moveNextFunction;
private CancellationTokenSource _subscriptionCancellationToken = new();
private readonly Action _disposalAction;

/// <summary>
/// Constructs a Subscription with a wrapped topic iterator and an action to dispose of it.
/// </summary>
public Subscription(Func<CancellationToken, ValueTask<TopicMessage?>> moveNextFunction, Action disposalAction)
public Subscription(Func<CancellationToken, ValueTask<ITopicEvent?>> moveNextFunction, Action disposalAction)
{
_moveNextFunction = moveNextFunction;
_disposalAction = disposalAction;
}

/// <summary>
/// Gets the enumerator for this topic. This subscription represents a single view on a topic, so multiple
/// Gets the message enumerator for this topic. Includes text and binary messages, but excludes system events.
/// </summary>
public IAsyncEnumerable<TopicMessage?> WithCancellation(CancellationToken cancellationToken)
{
return new AsyncEnumerableWrapper<TopicMessage?>(GetAsyncEnumerator(cancellationToken));
}

/// <summary>
/// Gets the event enumerator with cancellation for all topic events, including system events.
/// </summary>
public IAsyncEnumerable<ITopicEvent?> WithCancellationForAllEvents(CancellationToken cancellationToken)
{
return new AsyncEnumerableWrapper<ITopicEvent?>(GetAllEventsAsyncEnumerator(cancellationToken));
}

/// <summary>
/// Gets the message enumerator for this topic. Includes text and binary messages, but excludes system events.
///
/// This subscription represents a single view on a topic, so multiple
/// enumerators will interfere with each other.
/// </summary>
/// <param name="cancellationToken">A cancellation token to cancel the enumeration.</param>
/// <returns>An async enumerator for the topic messages.</returns>
public IAsyncEnumerator<TopicMessage?> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new TopicMessageEnumerator(_moveNextFunction, _subscriptionCancellationToken.Token, cancellationToken);
}

/// <summary>
/// Gets an enumerator for all events on this topic, including text and binary messages, and system events.
///
/// This subscription represents a single view on a topic, so multiple
/// enumerators will interfere with each other.
/// </summary>
/// <param name="cancellationToken">A cancellation token to cancel the enumeration.</param>
/// <returns>An async enumerator for all events on the topic.</returns>
IAsyncEnumerator<ITopicEvent?> GetAllEventsAsyncEnumerator(CancellationToken cancellationToken)
{
return new AllTopicEventsEnumerator(_moveNextFunction, _subscriptionCancellationToken.Token, cancellationToken);
}

/// <summary>
/// Unsubscribe from this topic.
Expand All @@ -78,12 +111,12 @@ public void Dispose()

private class TopicMessageEnumerator : IAsyncEnumerator<TopicMessage?>
{
private readonly Func<CancellationToken, ValueTask<TopicMessage?>> _moveNextFunction;
private readonly Func<CancellationToken, ValueTask<ITopicEvent?>> _moveNextFunction;
private readonly CancellationToken _subscriptionCancellationToken;
private readonly CancellationToken _enumeratorCancellationToken;

public TopicMessageEnumerator(
Func<CancellationToken, ValueTask<TopicMessage?>> moveNextFunction,
Func<CancellationToken, ValueTask<ITopicEvent?>> moveNextFunction,
CancellationToken subscriptionCancellationToken,
CancellationToken enumeratorCancellationToken)
{
Expand All @@ -94,6 +127,63 @@ public TopicMessageEnumerator(

public TopicMessage? Current { get; private set; }

public async ValueTask<bool> MoveNextAsync()
{
// We iterate over the stream until we get a TopicMessage, an error, or the stream is closed.
// We skip over system events like heartbeats and discontinuities.
while (true)
{
if (_subscriptionCancellationToken.IsCancellationRequested || _enumeratorCancellationToken.IsCancellationRequested)
{
Current = null;
return false;
}

var nextEvent = await _moveNextFunction.Invoke(_enumeratorCancellationToken);
switch (nextEvent)
{
case TopicMessage.Text:
case TopicMessage.Binary:
Current = (TopicMessage)nextEvent;
return true;
case TopicMessage.Error:
Current = (TopicMessage)nextEvent;
return false;
// This enumerator excludes system events from the stream
case TopicSystemEvent.Discontinuity:
case TopicSystemEvent.Heartbeat:
continue;
default:
Current = null;
return false;
}
}
}

public ValueTask DisposeAsync()
{
return new ValueTask();
}
}

private class AllTopicEventsEnumerator : IAsyncEnumerator<ITopicEvent?>
{
private readonly Func<CancellationToken, ValueTask<ITopicEvent?>> _moveNextFunction;
private readonly CancellationToken _subscriptionCancellationToken;
private readonly CancellationToken _enumeratorCancellationToken;

public AllTopicEventsEnumerator(
Func<CancellationToken, ValueTask<ITopicEvent?>> moveNextFunction,
CancellationToken subscriptionCancellationToken,
CancellationToken enumeratorCancellationToken)
{
_moveNextFunction = moveNextFunction;
_subscriptionCancellationToken = subscriptionCancellationToken;
_enumeratorCancellationToken = enumeratorCancellationToken;
}

public ITopicEvent? Current { get; private set; }

public async ValueTask<bool> MoveNextAsync()
{
if (_subscriptionCancellationToken.IsCancellationRequested || _enumeratorCancellationToken.IsCancellationRequested)
Expand All @@ -102,15 +192,18 @@ public async ValueTask<bool> MoveNextAsync()
return false;
}

var nextMessage = await _moveNextFunction.Invoke(_enumeratorCancellationToken);
switch (nextMessage)
var nextEvent = await _moveNextFunction.Invoke(_enumeratorCancellationToken);

switch (nextEvent)
{
case TopicMessage.Text:
case TopicMessage.Binary:
Current = nextMessage;
case TopicSystemEvent.Discontinuity:
case TopicSystemEvent.Heartbeat:
Current = nextEvent;
return true;
case TopicMessage.Error:
Current = nextMessage;
Current = nextEvent;
return false;
default:
Current = null;
Expand All @@ -124,6 +217,23 @@ public ValueTask DisposeAsync()
}
}

// Helper class to wrap async enumerators into async enumerable
// This is necessary to support multiple enumerators on the same subscription.
private class AsyncEnumerableWrapper<T> : IAsyncEnumerable<T>
{
private readonly IAsyncEnumerator<T> _asyncEnumerator;

public AsyncEnumerableWrapper(IAsyncEnumerator<T> asyncEnumerator)
{
_asyncEnumerator = asyncEnumerator;
}

public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return _asyncEnumerator;
}
}

/// <include file="../../docs.xml" path='docs/class[@name="Error"]/description/*' />
public class Error : TopicSubscribeResponse, IError
{
Expand Down
56 changes: 56 additions & 0 deletions src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
namespace Momento.Sdk.Responses;

/// <summary>
/// Represents a system event that can be published to a topic.
/// </summary>
public abstract class TopicSystemEvent : ITopicEvent
{
/// <summary>
/// Represents a heartbeat event.
/// </summary>
public class Heartbeat : TopicSystemEvent
{
/// <summary>
/// Constructs a new heartbeat event.
/// </summary>
public Heartbeat()
{

}

/// <inheritdoc/>
public override string ToString() => base.ToString() ?? "Heartbeat";
}

/// <summary>
/// Represents a discontinuity event.
/// </summary>
public class Discontinuity : TopicSystemEvent
{
/// <summary>
/// Constructs a new discontinuity event.
/// </summary>
/// <param name="lastKnownSequenceNumber">The last known sequence number before the discontinuity.</param>
/// <param name="sequenceNumber">The sequence number of the discontinuity.</param>
public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber)
{
LastKnownSequenceNumber = lastKnownSequenceNumber;
SequenceNumber = sequenceNumber;
}

/// <summary>
/// The last known sequence number before the discontinuity.
/// </summary>
public long LastKnownSequenceNumber { get; }
/// <summary>
/// The sequence number of the discontinuity.
/// </summary>
public long SequenceNumber { get; }

/// <inheritdoc/>
public override string ToString()
{
return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber}";
}
}
}
Loading
Loading