Skip to content

Commit

Permalink
feat: add topics (#468)
Browse files Browse the repository at this point in the history
Add the initial topics implementation. The entry point is TopicClient,
which contains the publish and subscribe methods. Subscribe returns an
IAsyncEnumerable<TopicMessage> that can be iterated over to read from
the topic.

TopicMessage is either a Text, a Binary, or an Error.

Add new configurations for creating a topic client.

---------

Co-authored-by: Kenny <[email protected]>
  • Loading branch information
nand4011 and kvcache authored Sep 6, 2023
1 parent 6fbdc85 commit 5ce2ff7
Show file tree
Hide file tree
Showing 19 changed files with 1,458 additions and 4 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jobs:
include:
- os: ubuntu-latest
target-framework: net6.0
- os: windows-latest
target-framework: net6.0
- os: windows-latest
target-framework: net461
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -48,10 +50,10 @@ jobs:
run: dotnet build

- name: Unit Test
run: dotnet test -f ${{ matrix.target-framework }} tests/Unit/Momento.Sdk.Tests
run: dotnet test --logger "console;verbosity=detailed" -f ${{ matrix.target-framework }} tests/Unit/Momento.Sdk.Tests

- name: Integration Test
run: dotnet test -f ${{ matrix.target-framework }} tests/Integration/Momento.Sdk.Tests
run: dotnet test --logger "console;verbosity=detailed" -f ${{ matrix.target-framework }} tests/Integration/Momento.Sdk.Tests

build_examples:
runs-on: ubuntu-latest
Expand Down
23 changes: 23 additions & 0 deletions src/Momento.Sdk/Config/ITopicConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Microsoft.Extensions.Logging;
using Momento.Sdk.Config.Transport;

namespace Momento.Sdk.Config;


/// <summary>
/// Contract for Topic SDK configurables.
/// </summary>
public interface ITopicConfiguration
{
/// <inheritdoc cref="Microsoft.Extensions.Logging.ILoggerFactory" />
public ILoggerFactory LoggerFactory { get; }
/// <inheritdoc cref="Momento.Sdk.Config.Transport.ITransportStrategy" />
public ITopicTransportStrategy TransportStrategy { get; }

/// <summary>
/// Creates a new instance of the Configuration object, updated to use the specified transport strategy.
/// </summary>
/// <param name="transportStrategy">This is responsible for configuring network tunables.</param>
/// <returns>Configuration object with custom transport strategy provided</returns>
public ITopicConfiguration WithTransportStrategy(ITopicTransportStrategy transportStrategy);
}
50 changes: 50 additions & 0 deletions src/Momento.Sdk/Config/TopicConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using Microsoft.Extensions.Logging;
using Momento.Sdk.Config.Transport;

namespace Momento.Sdk.Config;

/// <inheritdoc cref="Momento.Sdk.Config.ITopicConfiguration" />
public class TopicConfiguration : ITopicConfiguration
{
/// <inheritdoc />
public ILoggerFactory LoggerFactory { get; }

/// <inheritdoc />
public ITopicTransportStrategy TransportStrategy { get; }

/// <summary>
/// Create a new instance of a Topic Configuration object with provided arguments: <see cref="Momento.Sdk.Config.ITopicConfiguration.TransportStrategy"/>, and <see cref="Momento.Sdk.Config.ITopicConfiguration.LoggerFactory"/>
/// </summary>
/// <param name="transportStrategy">This is responsible for configuring network tunables.</param>
/// <param name="loggerFactory">This is responsible for configuring logging.</param>
public TopicConfiguration(ILoggerFactory loggerFactory, ITopicTransportStrategy transportStrategy)
{
LoggerFactory = loggerFactory;
TransportStrategy = transportStrategy;
}

/// <inheritdoc />
public ITopicConfiguration WithTransportStrategy(ITopicTransportStrategy transportStrategy)
{
return new TopicConfiguration(LoggerFactory, transportStrategy);
}

/// <inheritdoc />
public override bool Equals(object obj)
{
if ((obj == null) || !this.GetType().Equals(obj.GetType()))
{
return false;
}

var other = (Configuration)obj;
return TransportStrategy.Equals(other.TransportStrategy) &&
LoggerFactory.Equals(other.LoggerFactory);
}

/// <inheritdoc />
public override int GetHashCode()
{
return base.GetHashCode();
}
}
73 changes: 73 additions & 0 deletions src/Momento.Sdk/Config/TopicConfigurations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Momento.Sdk.Config.Transport;

namespace Momento.Sdk.Config;

/// <summary>
/// Provide pre-built topic configurations.
/// </summary>
public class TopicConfigurations
{
/// <summary>
/// Laptop config provides defaults suitable for a medium-to-high-latency environment. Permissive timeouts, retries, and
/// relaxed latency and throughput targets.
/// </summary>
public class Laptop : TopicConfiguration
{
private Laptop(ILoggerFactory loggerFactory, ITopicTransportStrategy transportStrategy) : base(loggerFactory,
transportStrategy)
{
}

/// <summary>
/// Provides the latest recommended configuration for a Laptop environment.
/// </summary>
/// <remark>
/// This configuration may change in future releases to take advantage of
/// improvements we identify for default configurations.
/// </remark>
/// <param name="loggerFactory"></param>
/// <returns></returns>
public static ITopicConfiguration latest(ILoggerFactory? loggerFactory = null)
{
var finalLoggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
ITopicTransportStrategy transportStrategy = new StaticTopicTransportStrategy(
loggerFactory: finalLoggerFactory,
grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(15000))
);
return new Laptop(finalLoggerFactory, transportStrategy);
}
}

/// <summary>
/// Mobile config provides defaults suitable for a medium-to-high-latency mobile environment.
/// </summary>
public class Mobile : TopicConfiguration
{
private Mobile(ILoggerFactory loggerFactory, ITopicTransportStrategy transportStrategy) : base(loggerFactory,
transportStrategy)
{
}

/// <summary>
/// Provides the latest recommended configuration for a Mobile environment.
/// </summary>
/// <remark>
/// This configuration may change in future releases to take advantage of
/// improvements we identify for default configurations.
/// </remark>
/// <param name="loggerFactory"></param>
/// <returns></returns>
public static ITopicConfiguration latest(ILoggerFactory? loggerFactory = null)
{
var finalLoggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
ITopicTransportStrategy transportStrategy = new StaticTopicTransportStrategy(
loggerFactory: finalLoggerFactory,
grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(15000))
);
return new Mobile(finalLoggerFactory, transportStrategy);
}
}
}
29 changes: 29 additions & 0 deletions src/Momento.Sdk/Config/Transport/ITopicTransportStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;

namespace Momento.Sdk.Config.Transport;

/// <summary>
/// This is responsible for configuring network tunables for topics.
/// </summary>
public interface ITopicTransportStrategy
{
/// <summary>
/// Configures the low-level gRPC settings for the Momento Topic client's communication
/// with the Momento server.
/// </summary>
public IGrpcConfiguration GrpcConfig { get; }

/// <summary>
/// Copy constructor to update the gRPC configuration
/// </summary>
/// <param name="grpcConfig"></param>
/// <returns>A new ITransportStrategy with the specified grpcConfig</returns>
public ITopicTransportStrategy WithGrpcConfig(IGrpcConfiguration grpcConfig);

/// <summary>
/// Copy constructor to update the client timeout for publishing
/// </summary>
/// <param name="clientTimeout"></param>
/// <returns>A new ITransportStrategy with the specified client timeout</returns>
public ITopicTransportStrategy WithClientTimeout(TimeSpan clientTimeout);
}
64 changes: 64 additions & 0 deletions src/Momento.Sdk/Config/Transport/StaticTopicTransportStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System;
using Microsoft.Extensions.Logging;

namespace Momento.Sdk.Config.Transport;

/// <summary>
/// The simplest way to configure the transport layer for the Momento Topic client.
/// Provides static values for the gRPC configuration.
/// </summary>
public class StaticTopicTransportStrategy : ITopicTransportStrategy
{
private readonly ILoggerFactory _loggerFactory;

/// <inheritdoc />
public IGrpcConfiguration GrpcConfig { get; }

/// <summary>
///
/// </summary>
/// <param name="loggerFactory"></param>
/// <param name="grpcConfig">Configures how Momento Topic client interacts with the Momento service via gRPC</param>
public StaticTopicTransportStrategy(ILoggerFactory loggerFactory, IGrpcConfiguration grpcConfig)
{
_loggerFactory = loggerFactory;
GrpcConfig = grpcConfig;
}

/// <inheritdoc/>
public ITopicTransportStrategy WithGrpcConfig(IGrpcConfiguration grpcConfig)
{
return new StaticTopicTransportStrategy(_loggerFactory, grpcConfig);
}

/// <inheritdoc/>
public ITopicTransportStrategy WithClientTimeout(TimeSpan clientTimeout)
{
return new StaticTopicTransportStrategy(_loggerFactory, GrpcConfig.WithDeadline(clientTimeout));
}

/// <summary>
/// Test equality by value.
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public override bool Equals(object obj)
{
if ((obj == null) || !this.GetType().Equals(obj.GetType()))
{
return false;
}

var other = (StaticTransportStrategy)obj;
return GrpcConfig.Equals(other.GrpcConfig);
}

/// <summary>
/// Trivial hash code implementation.
/// </summary>
/// <returns></returns>
public override int GetHashCode()
{
return base.GetHashCode();
}
}
69 changes: 69 additions & 0 deletions src/Momento.Sdk/ITopicClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System;
#if NETSTANDARD2_0_OR_GREATER


using System.Threading.Tasks;
using Momento.Sdk.Responses;

namespace Momento.Sdk;

/// <summary>
/// Minimum viable functionality of a topic client.
/// </summary>
public interface ITopicClient : IDisposable
{
/// <summary>
/// Publish a value to a topic in a cache.
/// </summary>
/// <param name="cacheName">Name of the cache containing the topic.</param>
/// <param name="topicName">Name of the topic.</param>
/// <param name="value">The value to be published.</param>
/// <returns>
/// Task object representing the result of the publish operation. The
/// response object is resolved to a type-safe object of one of
/// the following subtypes:
/// <list type="bullet">
/// <item><description>TopicPublishResponse.Success</description></item>
/// <item><description>TopicPublishResponse.Error</description></item>
/// </list>
/// Pattern matching can be used to operate on the appropriate subtype.
/// For example:
/// <code>
/// if (response is TopicPublishResponse.Error errorResponse)
/// {
/// // handle error as appropriate
/// }
/// </code>
/// </returns>
public Task<TopicPublishResponse> PublishAsync(string cacheName, string topicName, byte[] value);

/// <inheritdoc cref="PublishAsync(string, string, byte[])"/>
public Task<TopicPublishResponse> PublishAsync(string cacheName, string topicName, string value);

/// <summary>
/// Subscribe to a topic. The returned value can be used to iterate over newly published messages on the topic.
/// </summary>
/// <param name="cacheName">Name of the cache containing the topic.</param>
/// <param name="topicName">Name of the topic.</param>
/// <param name="resumeAtSequenceNumber">The sequence number of the last message.
/// If provided, the client will attempt to start the stream from that sequence number.</param>
/// <returns>
/// Task object representing the result of the subscribe operation. The
/// response object is resolved to a type-safe object of one of
/// the following subtypes:
/// <list type="bullet">
/// <item><description>TopicSubscribeResponse.Subscription</description></item>
/// <item><description>TopicSubscribeResponse.Error</description></item>
/// </list>
/// Pattern matching can be used to operate on the appropriate subtype.
/// For example:
/// <code>
/// if (response is TopicSubscribeResponse.Error errorResponse)
/// {
/// // handle error as appropriate
/// }
/// </code>
/// </returns>
public Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null);
}
#endif
Loading

0 comments on commit 5ce2ff7

Please sign in to comment.