Skip to content

Commit

Permalink
feat: add topics
Browse files Browse the repository at this point in the history
Add the initial topics implementation. The entry point is TopicClient,
which contains the publish and subscribe methods. Subscribe returns an
IAsyncEnumerable<TopicMessage> that can be iterated over to read from
the topic.
  • Loading branch information
nand4011 committed Sep 1, 2023
1 parent 062b06d commit e10ae02
Show file tree
Hide file tree
Showing 11 changed files with 1,120 additions and 1 deletion.
65 changes: 65 additions & 0 deletions src/Momento.Sdk/ITopicClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using System.Threading.Tasks;
using Momento.Sdk.Responses;

namespace Momento.Sdk;

/// <summary>
/// Minimum viable functionality of a topic client.
/// </summary>
public interface ITopicClient : IDisposable
{
/// <summary>
/// Publish a value to a topic in a cache.
/// </summary>
/// <param name="cacheName">Name of the cache containing the topic.</param>
/// <param name="topicName">Name of the topic.</param>
/// <param name="value">The value to be published.</param>
/// <returns>
/// Task object representing the result of the publish operation. The
/// response object is resolved to a type-safe object of one of
/// the following subtypes:
/// <list type="bullet">
/// <item><description>TopicPublishResponse.Success</description></item>
/// <item><description>TopicPublishResponse.Error</description></item>
/// </list>
/// Pattern matching can be used to operate on the appropriate subtype.
/// For example:
/// <code>
/// if (response is TopicPublishResponse.Error errorResponse)
/// {
/// // handle error as appropriate
/// }
/// </code>
/// </returns>
public Task<TopicPublishResponse> PublishAsync(string cacheName, string topicName, byte[] value);

/// <inheritdoc cref="PublishAsync(string, string, byte[])"/>
public Task<TopicPublishResponse> PublishAsync(string cacheName, string topicName, string value);

/// <summary>
/// Subscribe to a topic. The returned value can be used to iterate over newly published messages on the topic.
/// </summary>
/// <param name="cacheName">Name of the cache containing the topic.</param>
/// <param name="topicName">Name of the topic.</param>
/// <param name="resumeAtSequenceNumber">The sequence number of the last message.
/// If provided, the client will attempt to start the stream from that sequence number.</param>
/// <returns>
/// Task object representing the result of the subscribe operation. The
/// response object is resolved to a type-safe object of one of
/// the following subtypes:
/// <list type="bullet">
/// <item><description>TopicSubscribeResponse.Subscription</description></item>
/// <item><description>TopicSubscribeResponse.Error</description></item>
/// </list>
/// Pattern matching can be used to operate on the appropriate subtype.
/// For example:
/// <code>
/// if (response is TopicSubscribeResponse.Error errorResponse)
/// {
/// // handle error as appropriate
/// }
/// </code>
/// </returns>
public Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null);
}
69 changes: 69 additions & 0 deletions src/Momento.Sdk/Internal/LoggingUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,75 @@ public static TSuccess LogTraceCollectionRequestSuccess<TSuccess>(this ILogger _
}
return success;
}

/// <summary>
/// Logs a message at TRACE level that indicates that a topic request is about to be executed.
/// </summary>
/// <param name="logger"></param>
/// <param name="requestType"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
public static void LogTraceExecutingTopicRequest(this ILogger logger, string requestType, string cacheName, string topicName)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Executing '{}' request: cacheName: {}; topicName: {}", requestType, cacheName, topicName);
}
}

/// <summary>
/// Logs a message at TRACE level that indicates that a topic request resulted in an error.
/// </summary>
/// <typeparam name="TError"></typeparam>
/// <param name="logger"></param>
/// <param name="requestType"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
/// <param name="error"></param>
/// <returns></returns>
public static TError LogTraceTopicRequestError<TError>(this ILogger logger, string requestType, string cacheName, string topicName, TError error)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("An error occurred while executing a '{}' request: cacheName: {}; topicName: {}; error: {}", requestType, cacheName, topicName, error);
}
return error;
}

/// <summary>
/// /// Logs a message at TRACE level that indicates that a topic request resulted in a success.
/// </summary>
/// <typeparam name="TSuccess"></typeparam>
/// <param name="logger"></param>
/// <param name="requestType"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
/// <param name="success"></param>
/// <returns></returns>
public static TSuccess LogTraceTopicRequestSuccess<TSuccess>(this ILogger logger, string requestType, string cacheName, string topicName, TSuccess success)
{

if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Successfully executed '{}' request: cacheName: {}; topicName: {}; success: {}", requestType, cacheName, topicName, success);
}
return success;
}

/// <summary>
/// Logs a message at TRACE level that indicates that a topic message was received.
/// </summary>
/// <param name="logger"></param>
/// <param name="messageType"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
public static void LogTraceTopicMessageReceived(this ILogger logger, string messageType, string cacheName, string topicName)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Received '{}' message on: cacheName: {}; topicName: {}", messageType, cacheName, topicName);
}
}

private static string ReadableByteString(ByteString? input)
{
Expand Down
183 changes: 183 additions & 0 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Momento.Protos.CacheClient.Pubsub;
using Momento.Sdk.Config;
using Momento.Sdk.Exceptions;
using Momento.Sdk.Internal.ExtensionMethods;
using Momento.Sdk.Responses;

namespace Momento.Sdk.Internal;

public class ScsTopicClientBase : IDisposable

Check warning on line 14 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase'

Check warning on line 14 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase'
{
protected readonly TopicGrpcManager grpcManager;

Check warning on line 16 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.grpcManager'

Check warning on line 16 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.grpcManager'
private readonly TimeSpan dataClientOperationTimeout;
private readonly ILogger _logger;

protected readonly CacheExceptionMapper _exceptionMapper;

Check warning on line 20 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase._exceptionMapper'

Check warning on line 20 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase._exceptionMapper'

public ScsTopicClientBase(IConfiguration config, string authToken, string endpoint)

Check warning on line 22 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.ScsTopicClientBase(IConfiguration, string, string)'

Check warning on line 22 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.ScsTopicClientBase(IConfiguration, string, string)'
{
this.grpcManager = new TopicGrpcManager(config, authToken, endpoint);
this.dataClientOperationTimeout = config.TransportStrategy.GrpcConfig.Deadline;
this._logger = config.LoggerFactory.CreateLogger<ScsDataClient>();
this._exceptionMapper = new CacheExceptionMapper(config.LoggerFactory);
}

protected Metadata MetadataWithCache(string cacheName)

Check warning on line 30 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.MetadataWithCache(string)'

Check warning on line 30 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.MetadataWithCache(string)'
{
return new Metadata() { { "cache", cacheName } };
}

protected DateTime CalculateDeadline()

Check warning on line 35 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.CalculateDeadline()'

Check warning on line 35 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.CalculateDeadline()'
{
return DateTime.UtcNow.Add(dataClientOperationTimeout);
}

public void Dispose()

Check warning on line 40 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.Dispose()'

Check warning on line 40 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.Dispose()'
{
this.grpcManager.Dispose();
}
}

internal sealed class ScsTopicClient : ScsTopicClientBase
{
private readonly ILogger _logger;

public ScsTopicClient(IConfiguration config, string authToken, string endpoint)
: base(config, authToken, endpoint)
{
this._logger = config.LoggerFactory.CreateLogger<ScsTopicClient>();
}

public async Task<TopicPublishResponse> Publish(string cacheName, string topicName, byte[] value)
{
var topicValue = new _TopicValue
{
Binary = value.ToByteString()
};
return await SendPublish(cacheName, topicName, topicValue);
}

public async Task<TopicPublishResponse> Publish(string cacheName, string topicName, string value)
{
var topicValue = new _TopicValue
{
Text = value
};
return await SendPublish(cacheName, topicName, topicValue);
}

public async Task<TopicSubscribeResponse> Subscribe(string cacheName, string topicName,
ulong? resumeAtTopicSequenceNumber = null)
{
return await SendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber);
}

private const string RequestTypeTopicPublish = "TOPIC_PUBLISH";

private async Task<TopicPublishResponse> SendPublish(string cacheName, string topicName, _TopicValue value)
{
_PublishRequest request = new _PublishRequest
{
CacheName = cacheName,
Topic = topicName,
Value = value
};

try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicPublish, cacheName, topicName);
await grpcManager.Client.publish(request, new CallOptions(deadline: CalculateDeadline()));
}
catch (Exception e)
{
return _logger.LogTraceTopicRequestError(RequestTypeTopicPublish, cacheName, topicName,
new TopicPublishResponse.Error(_exceptionMapper.Convert(e)));
}

return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicPublish, cacheName, topicName,
new TopicPublishResponse.Success());
}

private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, string topicName,

Check warning on line 106 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 106 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
ulong? resumeAtTopicSequenceNumber)
{
var request = new _SubscriptionRequest
{
CacheName = cacheName,
Topic = topicName
};
if (resumeAtTopicSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value;
}

AsyncServerStreamingCall<_SubscriptionItem> subscription;
try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicPublish, cacheName, topicName);
subscription = grpcManager.Client.subscribe(request, new CallOptions());
}
catch (Exception e)
{
return _logger.LogTraceTopicRequestError(RequestTypeTopicPublish, cacheName, topicName,
new TopicSubscribeResponse.Error(_exceptionMapper.Convert(e)));
}

var response = new TopicSubscribeResponse.Subscription(
token => MoveNextAsync(subscription, token, cacheName, topicName),
() => subscription.Dispose());
return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicPublish, cacheName, topicName,
response);
}

private async ValueTask<TopicMessage?> MoveNextAsync(AsyncServerStreamingCall<_SubscriptionItem> subscription,
CancellationToken cancellationToken, string cacheName, string topicName)
{
if (cancellationToken.IsCancellationRequested)
{
return null;
}

try
{
while (await subscription.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false))
{
var message = subscription.ResponseStream.Current;

switch (message.KindCase)
{
case _SubscriptionItem.KindOneofCase.Item:
_logger.LogTraceTopicMessageReceived("item", cacheName, topicName);
return new TopicMessage.Item(message.Item);
case _SubscriptionItem.KindOneofCase.Discontinuity:
_logger.LogTraceTopicMessageReceived("discontinuity", cacheName, topicName);
break;
case _SubscriptionItem.KindOneofCase.Heartbeat:
_logger.LogTraceTopicMessageReceived("heartbeat", cacheName, topicName);
break;
case _SubscriptionItem.KindOneofCase.None:
_logger.LogTraceTopicMessageReceived("none", cacheName, topicName);
break;
default:
_logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
break;
}
}
}
catch (OperationCanceledException)
{
return null;
}
catch (Exception e)
{
return new TopicMessage.Error(_exceptionMapper.Convert(e));
}

return null;
}
}
Loading

0 comments on commit e10ae02

Please sign in to comment.