Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add other client extensions #637

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions sandbox/Example.Client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

using System.Text;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;
using NATS.Client.ObjectStore;
using NATS.Client.Services;
using NATS.Net;

CancellationTokenSource cts = new();
Expand Down Expand Up @@ -95,6 +98,22 @@
Console.WriteLine($"JetStream Stream: {stream.Info.Config.Name}");
}

// Use KeyValueStore by referencing NATS.Client.KeyValueStore package
var kv1 = client.CreateKeyValueStoreContext();
var kv2 = js.CreateKeyValueStoreContext();
await kv1.CreateStoreAsync("store1");
await kv2.CreateStoreAsync("store1");

// Use ObjectStore by referencing NATS.Client.ObjectStore package
var obj1 = client.CreateObjectStoreContext();
var obj2 = js.CreateObjectStoreContext();
await obj1.CreateObjectStoreAsync("store1");
await obj2.CreateObjectStoreAsync("store1");

// Use Services by referencing NATS.Client.Services package
var svc = client.CreateServicesContext();
await svc.AddServiceAsync("service1", "1.0.0");

await cts.CancelAsync();

await Task.WhenAll(tasks);
Expand Down
27 changes: 27 additions & 0 deletions src/NATS.Client.JetStream/INatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ public interface INatsJSContext
/// </summary>
INatsConnection Connection { get; }

/// <summary>
/// Provides configuration options for the JetStream context.
/// </summary>
NatsJSOpts Opts { get; }

/// <summary>
/// Creates new ordered consumer.
/// </summary>
Expand Down Expand Up @@ -296,4 +301,26 @@ ValueTask<NatsJSPublishConcurrentFuture> PublishConcurrentAsync<T>(
NatsJSPubOpts? opts = default,
NatsHeaders? headers = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Generates a new base inbox string using the connection's inbox prefix.
/// </summary>
/// <returns>A new inbox string.</returns>
string NewBaseInbox();

/// <summary>
/// Sends a request message to a JetStream subject and waits for a response.
/// </summary>
/// <param name="subject">The JetStream API subject to send the request to.</param>
/// <param name="request">The request message object.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="TRequest">The type of the request message.</typeparam>
/// <typeparam name="TResponse">The type of the response message.</typeparam>
/// <returns>A task representing the asynchronous operation, with a result of type <typeparamref name="TResponse"/>.</returns>
ValueTask<TResponse> JSRequestResponseAsync<TRequest, TResponse>(
string subject,
TRequest? request,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class;
}
10 changes: 5 additions & 5 deletions src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal class NatsJSOrderedPushConsumer<T>
{
private readonly ILogger _logger;
private readonly bool _debug;
private readonly NatsJSContext _context;
private readonly INatsJSContext _context;
private readonly string _stream;
private readonly string _filter;
private readonly INatsDeserialize<T> _serializer;
Expand All @@ -68,7 +68,7 @@ internal class NatsJSOrderedPushConsumer<T>
private int _done;

public NatsJSOrderedPushConsumer(
NatsJSContext context,
INatsJSContext context,
string stream,
string filter,
INatsDeserialize<T> serializer,
Expand Down Expand Up @@ -417,23 +417,23 @@ private void CreateSub(string origin)

internal class NatsJSOrderedPushConsumerSub<T> : NatsSubBase
{
private readonly NatsJSContext _context;
private readonly INatsJSContext _context;
private readonly CancellationToken _cancellationToken;
private readonly INatsConnection _nats;
private readonly NatsHeaderParser _headerParser;
private readonly INatsDeserialize<T> _serializer;
private readonly ChannelWriter<NatsJSOrderedPushConsumerMsg<T>> _commands;

public NatsJSOrderedPushConsumerSub(
NatsJSContext context,
INatsJSContext context,
Channel<NatsJSOrderedPushConsumerMsg<T>> commandChannel,
INatsDeserialize<T> serializer,
NatsSubOpts? opts,
CancellationToken cancellationToken)
: base(
connection: context.Connection,
manager: context.Connection.SubscriptionManager,
subject: context.NewInbox(),
subject: context.NewBaseInbox(),
queueGroup: default,
opts)
{
Expand Down
10 changes: 10 additions & 0 deletions src/NATS.Client.JetStream/NatsClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@ namespace NATS.Client.JetStream;

public static class NatsClientExtensions
{
/// <summary>
/// Creates a JetStream context using the provided NATS client.
/// </summary>
/// <param name="client">The NATS client used to create the JetStream context.</param>
/// <returns>Returns an instance of <see cref="INatsJSContext"/> for interacting with JetStream.</returns>
public static INatsJSContext CreateJetStreamContext(this INatsClient client)
=> CreateJetStreamContext(client.Connection);

/// <summary>
/// Creates a JetStream context using the provided NATS connection.
/// </summary>
/// <param name="connection">The NATS connection used to create the JetStream context.</param>
/// <returns>Returns an instance of <see cref="INatsJSContext"/> for interacting with JetStream.</returns>
public static INatsJSContext CreateJetStreamContext(this INatsConnection connection)
=> new NatsJSContext(connection);
}
6 changes: 3 additions & 3 deletions src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ internal async ValueTask<NatsJSConsume<T>> ConsumeInternalAsync<T>(INatsDeserial

opts ??= new NatsJSConsumeOpts();
serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer<T>();
var inbox = _context.NewInbox();
var inbox = _context.NewBaseInbox();

var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes);
var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat);
Expand Down Expand Up @@ -332,7 +332,7 @@ internal async ValueTask<NatsJSOrderedConsume<T>> OrderedConsumeInternalAsync<T>
ThrowIfDeleted();

serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer<T>();
var inbox = _context.NewInbox();
var inbox = _context.NewBaseInbox();

var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes);
var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat);
Expand Down Expand Up @@ -382,7 +382,7 @@ internal async ValueTask<NatsJSFetch<T>> FetchInternalAsync<T>(
ThrowIfDeleted();
serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer<T>();

var inbox = _context.NewInbox();
var inbox = _context.NewBaseInbox();

var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes);
var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat);
Expand Down
33 changes: 18 additions & 15 deletions src/NATS.Client.JetStream/NatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public NatsJSContext(INatsConnection connection, NatsJSOpts opts)

public INatsConnection Connection { get; }

internal NatsJSOpts Opts { get; }
/// <inheritdoc />
public NatsJSOpts Opts { get; }

/// <summary>
/// Calls JetStream Account Info API.
Expand Down Expand Up @@ -238,6 +239,22 @@ public async ValueTask<NatsJSPublishConcurrentFuture> PublishConcurrentAsync<T>(
return new NatsJSPublishConcurrentFuture(sub);
}

/// <inheritdoc />
public string NewBaseInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix);

/// <inheritdoc />
public async ValueTask<TResponse> JSRequestResponseAsync<TRequest, TResponse>(
string subject,
TRequest? request,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
{
var response = await JSRequestAsync<TRequest, TResponse>(subject, request, cancellationToken);
response.EnsureSuccess();
return response.Response!;
}

internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArgumentExpression("name")] string? paramName = null)
{
#if NETSTANDARD
Expand All @@ -262,20 +279,6 @@ internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArg
}
}

internal string NewInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix);

internal async ValueTask<TResponse> JSRequestResponseAsync<TRequest, TResponse>(
string subject,
TRequest? request,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
{
var response = await JSRequestAsync<TRequest, TResponse>(subject, request, cancellationToken);
response.EnsureSuccess();
return response.Response!;
}

internal async ValueTask<NatsJSResponse<TResponse>> JSRequestAsync<TRequest, TResponse>(
string subject,
TRequest? request,
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ public interface INatsJSMsg<out T>
/// <typeparam name="T">User message type</typeparam>
public readonly struct NatsJSMsg<T> : INatsJSMsg<T>
{
private readonly NatsJSContext _context;
private readonly INatsJSContext _context;
private readonly NatsMsg<T> _msg;
private readonly Lazy<NatsJSMsgMetadata?> _replyToDateTimeAndSeq;

public NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
public NatsJSMsg(NatsMsg<T> msg, INatsJSContext context)
{
_msg = msg;
_context = context;
Expand Down
5 changes: 5 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ namespace NATS.Client.KeyValueStore;

public interface INatsKVContext
{
/// <summary>
/// Provides access to the JetStream context associated with the Key-Value Store operations.
/// </summary>
INatsJSContext Context { get; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me, but wondering what you think about folks using code like

var context = client.CreateKeyValueStoreContext();
...
var jsContext = context.Context;

I don't mind it, but the thought came to mind and figured I'd mention it.


/// <summary>
/// Create a new Key Value Store or get an existing one
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ namespace NATS.Client.KeyValueStore.Internal;

internal class NatsKVWatchSub<T> : NatsSubBase
{
private readonly NatsJSContext _context;
private readonly INatsJSContext _context;
private readonly CancellationToken _cancellationToken;
private readonly INatsConnection _nats;
private readonly NatsHeaderParser _headerParser;
private readonly INatsDeserialize<T> _serializer;
private readonly ChannelWriter<NatsKVWatchCommandMsg<T>> _commands;

public NatsKVWatchSub(
NatsJSContext context,
INatsJSContext context,
Channel<NatsKVWatchCommandMsg<T>> commandChannel,
INatsDeserialize<T> serializer,
NatsSubOpts? opts,
CancellationToken cancellationToken)
: base(
connection: context.Connection,
manager: context.Connection.SubscriptionManager,
subject: context.NewInbox(),
subject: context.NewBaseInbox(),
queueGroup: default,
opts)
{
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal sealed class NatsKVWatcher<T> : IAsyncDisposable
{
private readonly ILogger _logger;
private readonly bool _debug;
private readonly NatsJSContext _context;
private readonly INatsJSContext _context;
private readonly string _bucket;
private readonly INatsDeserialize<T> _serializer;
private readonly NatsKVWatchOpts _opts;
Expand All @@ -53,7 +53,7 @@ internal sealed class NatsKVWatcher<T> : IAsyncDisposable
private INatsJSConsumer? _initialConsumer;

public NatsKVWatcher(
NatsJSContext context,
INatsJSContext context,
string bucket,
IEnumerable<string> keys,
INatsDeserialize<T> serializer,
Expand Down
31 changes: 31 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using NATS.Client.Core;
using NATS.Client.JetStream;

namespace NATS.Client.KeyValueStore;

public static class NatsClientExtensions
{
/// <summary>
/// Creates a NATS Key-Value Store context using the specified NATS client.
/// </summary>
/// <param name="client">The NATS client instance.</param>
/// <returns>An instance of <see cref="INatsKVContext"/> which can be used to interact with the Key-Value Store.</returns>
public static INatsKVContext CreateKeyValueStoreContext(this INatsClient client)
=> CreateKeyValueStoreContext(client.Connection);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to chain these to the JS extensions?

public static INatsKVContext CreateKeyValueStoreContext(this INatsClient client)
    => CreateKeyValueStoreContext(client.CreateJetStreamContext());

public static INatsKVContext CreateKeyValueStoreContext(this INatsConnection connection)
    => CreateKeyValueStoreContext(connection.CreateJetStreamContext());


/// <summary>
/// Creates a NATS Key-Value Store context using the specified NATS connection.
/// </summary>
/// <param name="connection">The NATS connection instance.</param>
/// <returns>An instance of <see cref="INatsKVContext"/> which can be used to interact with the Key-Value Store.</returns>
public static INatsKVContext CreateKeyValueStoreContext(this INatsConnection connection)
=> CreateKeyValueStoreContext(new NatsJSContext(connection));

/// <summary>
/// Creates a NATS Key-Value Store context using the specified NATS JetStream context.
/// </summary>
/// <param name="context">The NATS JetStream context instance.</param>
/// <returns>An instance of <see cref="INatsKVContext"/> which can be used to interact with the Key-Value Store.</returns>
public static INatsKVContext CreateKeyValueStoreContext(this INatsJSContext context)
=> new NatsKVContext(context);
}
Loading
Loading