Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add an topic event enumerable #575

Merged
merged 7 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public async Task Subscribe()
_subscribed = true;
}

public async ValueTask<TopicMessage?> GetNextRelevantMessageFromGrpcStreamAsync(
public async ValueTask<ITopicEvent?> GetNextRelevantMessageFromGrpcStreamAsync(
malandis marked this conversation as resolved.
Show resolved Hide resolved
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
Expand Down
12 changes: 12 additions & 0 deletions src/Momento.Sdk/Responses/Topic/ITopicEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Momento.Sdk.Responses;

/// <summary>
/// Represents an event that can be published to a topic.
///
/// This includes not just topic mesasges, but also system events
/// such as discontinuities and heartbeats.
/// </summary>
public interface ITopicEvent
{

}
15 changes: 14 additions & 1 deletion src/Momento.Sdk/Responses/Topic/TopicMessage.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Momento.Protos.CacheClient.Pubsub;
using Momento.Sdk.Exceptions;
using Momento.Sdk.Internal.ExtensionMethods;

namespace Momento.Sdk.Responses;

Expand Down Expand Up @@ -31,7 +32,7 @@ namespace Momento.Sdk.Responses;
/// }
/// </code>
/// </summary>
public abstract class TopicMessage
public abstract class TopicMessage : ITopicEvent
{
/// <summary>
/// A topic message containing a text value.
Expand Down Expand Up @@ -63,6 +64,12 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId =
/// This can be used to securely identify the sender of a message.
/// </summary>
public string? TokenId { get; }

/// <inheritdoc />
public override string ToString()
{
return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\"";
}
}

/// <summary>
Expand Down Expand Up @@ -95,6 +102,12 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId
/// This can be used to securely identify the sender of a message.
/// </summary>
public string? TokenId { get; }

/// <inheritdoc />
public override string ToString()
{
return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\"";
}
}

/// <include file="../../docs.xml" path='docs/class[@name="Error"]/description/*' />
Expand Down
12 changes: 6 additions & 6 deletions src/Momento.Sdk/Responses/Topic/TopicSubscribeResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ public abstract class TopicSubscribeResponse
/// </summary>
public class Subscription : TopicSubscribeResponse, IDisposable, IAsyncEnumerable<TopicMessage?>
{
private readonly Func<CancellationToken, ValueTask<TopicMessage?>> _moveNextFunction;
private readonly Func<CancellationToken, ValueTask<ITopicEvent?>> _moveNextFunction;
private CancellationTokenSource _subscriptionCancellationToken = new();
private readonly Action _disposalAction;

/// <summary>
/// Constructs a Subscription with a wrapped topic iterator and an action to dispose of it.
/// </summary>
public Subscription(Func<CancellationToken, ValueTask<TopicMessage?>> moveNextFunction, Action disposalAction)
public Subscription(Func<CancellationToken, ValueTask<ITopicEvent?>> moveNextFunction, Action disposalAction)
{
_moveNextFunction = moveNextFunction;
_disposalAction = disposalAction;
Expand Down Expand Up @@ -78,12 +78,12 @@ public void Dispose()

private class TopicMessageEnumerator : IAsyncEnumerator<TopicMessage?>
{
private readonly Func<CancellationToken, ValueTask<TopicMessage?>> _moveNextFunction;
private readonly Func<CancellationToken, ValueTask<ITopicEvent?>> _moveNextFunction;
private readonly CancellationToken _subscriptionCancellationToken;
private readonly CancellationToken _enumeratorCancellationToken;

public TopicMessageEnumerator(
Func<CancellationToken, ValueTask<TopicMessage?>> moveNextFunction,
Func<CancellationToken, ValueTask<ITopicEvent?>> moveNextFunction,
CancellationToken subscriptionCancellationToken,
CancellationToken enumeratorCancellationToken)
{
Expand All @@ -107,10 +107,10 @@ public async ValueTask<bool> MoveNextAsync()
{
case TopicMessage.Text:
case TopicMessage.Binary:
Current = nextMessage;
Current = (TopicMessage)nextMessage;
return true;
case TopicMessage.Error:
Current = nextMessage;
Current = (TopicMessage)nextMessage;
return false;
default:
Current = null;
Expand Down
56 changes: 56 additions & 0 deletions src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
namespace Momento.Sdk.Responses;

/// <summary>
/// Represents a system event that can be published to a topic.
/// </summary>
public abstract class TopicSystemEvent : ITopicEvent
{
/// <summary>
/// Represents a heartbeat event.
/// </summary>
public class Heartbeat : TopicSystemEvent
{
/// <summary>
/// Constructs a new heartbeat event.
/// </summary>
public Heartbeat()
{

}

/// <inheritdoc/>
public override string ToString() => base.ToString() ?? "Heartbeat";
}

/// <summary>
/// Represents a discontinuity event.
/// </summary>
public class Discontinuity : TopicSystemEvent
{
/// <summary>
/// Constructs a new discontinuity event.
/// </summary>
/// <param name="lastKnownSequenceNumber">The last known sequence number before the discontinuity.</param>
/// <param name="sequenceNumber">The sequence number of the discontinuity.</param>
public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber)
{
LastKnownSequenceNumber = lastKnownSequenceNumber;
SequenceNumber = sequenceNumber;
}

/// <summary>
/// The last known sequence number before the discontinuity.
/// </summary>
public long LastKnownSequenceNumber { get; }
/// <summary>
/// The sequence number of the discontinuity.
/// </summary>
public long SequenceNumber { get; }

/// <inheritdoc/>
public override string ToString()
{
return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber}";
}
}
}
Loading