From 7e8a05df0333ceb90d4d94d6da79604b9ffa24cb Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 23 Sep 2024 16:50:11 +0100 Subject: [PATCH 1/4] Add client extension methods and refactor interfaces Refactor to use interfaces for enhanced flexibility and testability across various components like NatsJSContext and NatsConnection. Added new extension methods to easily create contexts for Object Store, Key-Value Store, and Services on NATS client and connection instances. --- sandbox/Example.Client/Program.cs | 19 +++++++++++ src/NATS.Client.JetStream/INatsJSContext.cs | 27 +++++++++++++++ .../Internal/NatsJSOrderedPushConsumer.cs | 10 +++--- .../NatsClientExtensions.cs | 10 ++++++ src/NATS.Client.JetStream/NatsJSConsumer.cs | 6 ++-- src/NATS.Client.JetStream/NatsJSContext.cs | 33 ++++++++++--------- src/NATS.Client.JetStream/NatsJSMsg.cs | 4 +-- .../Internal/NatsKVWatchSub.cs | 6 ++-- .../Internal/NatsKVWatcher.cs | 4 +-- .../NatsClientExtensions.cs | 31 +++++++++++++++++ .../NatsKVContext.cs | 4 +-- src/NATS.Client.KeyValueStore/NatsKVStore.cs | 4 +-- .../NatsClientExtensions.cs | 31 +++++++++++++++++ src/NATS.Client.ObjectStore/NatsObjContext.cs | 4 +-- src/NATS.Client.ObjectStore/NatsObjStore.cs | 4 +-- .../Internal/SvcListener.cs | 4 +-- .../NatsClientExtensions.cs | 22 +++++++++++++ src/NATS.Client.Services/NatsSvcContext.cs | 4 +-- src/NATS.Client.Services/NatsSvcEndPoint.cs | 6 ++-- src/NATS.Client.Services/NatsSvcServer.cs | 4 +-- .../NatsKVContextFactoryTest.cs | 5 +++ .../NatsObjContextFactoryTest.cs | 5 +++ 22 files changed, 200 insertions(+), 47 deletions(-) create mode 100644 src/NATS.Client.KeyValueStore/NatsClientExtensions.cs create mode 100644 src/NATS.Client.ObjectStore/NatsClientExtensions.cs create mode 100644 src/NATS.Client.Services/NatsClientExtensions.cs diff --git a/sandbox/Example.Client/Program.cs b/sandbox/Example.Client/Program.cs index 4bac3d56a..c72d89ba8 100644 --- a/sandbox/Example.Client/Program.cs +++ b/sandbox/Example.Client/Program.cs @@ -2,6 +2,9 @@ using System.Text; using NATS.Client.JetStream; +using NATS.Client.KeyValueStore; +using NATS.Client.ObjectStore; +using NATS.Client.Services; using NATS.Net; CancellationTokenSource cts = new(); @@ -95,6 +98,22 @@ Console.WriteLine($"JetStream Stream: {stream.Info.Config.Name}"); } +// Use KeyValueStore by referencing NATS.Client.KeyValueStore package +var kv1 = client.CreateKeyValueStoreContext(); +var kv2 = js.CreateKeyValueStoreContext(); +await kv1.CreateStoreAsync("store1"); +await kv2.CreateStoreAsync("store1"); + +// Use ObjectStore by referencing NATS.Client.ObjectStore package +var obj1 = client.CreateObjectStoreContext(); +var obj2 = js.CreateObjectStoreContext(); +await obj1.CreateObjectStoreAsync("store1"); +await obj2.CreateObjectStoreAsync("store1"); + +// Use Services by referencing NATS.Client.Services package +var svc = client.CreateServicesContext(); +await svc.AddServiceAsync("service1", "1.0.0"); + await cts.CancelAsync(); await Task.WhenAll(tasks); diff --git a/src/NATS.Client.JetStream/INatsJSContext.cs b/src/NATS.Client.JetStream/INatsJSContext.cs index 2c51a20cb..a23eff2c8 100644 --- a/src/NATS.Client.JetStream/INatsJSContext.cs +++ b/src/NATS.Client.JetStream/INatsJSContext.cs @@ -11,6 +11,11 @@ public interface INatsJSContext /// INatsConnection Connection { get; } + /// + /// Provides configuration options for the JetStream context. + /// + NatsJSOpts Opts { get; } + /// /// Creates new ordered consumer. /// @@ -296,4 +301,26 @@ ValueTask PublishConcurrentAsync( NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default); + + /// + /// Generates a new base inbox string using the connection's inbox prefix. + /// + /// A new inbox string. + string NewBaseInbox(); + + /// + /// Sends a request message to a JetStream subject and waits for a response. + /// + /// The JetStream API subject to send the request to. + /// The request message object. + /// A used to cancel the API call. + /// The type of the request message. + /// The type of the response message. + /// A task representing the asynchronous operation, with a result of type . + ValueTask JSRequestResponseAsync( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class; } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index 03e8e51bf..d10b1ac09 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -45,7 +45,7 @@ internal class NatsJSOrderedPushConsumer { private readonly ILogger _logger; private readonly bool _debug; - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly string _stream; private readonly string _filter; private readonly INatsDeserialize _serializer; @@ -68,7 +68,7 @@ internal class NatsJSOrderedPushConsumer private int _done; public NatsJSOrderedPushConsumer( - NatsJSContext context, + INatsJSContext context, string stream, string filter, INatsDeserialize serializer, @@ -417,7 +417,7 @@ private void CreateSub(string origin) internal class NatsJSOrderedPushConsumerSub : NatsSubBase { - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly CancellationToken _cancellationToken; private readonly INatsConnection _nats; private readonly NatsHeaderParser _headerParser; @@ -425,7 +425,7 @@ internal class NatsJSOrderedPushConsumerSub : NatsSubBase private readonly ChannelWriter> _commands; public NatsJSOrderedPushConsumerSub( - NatsJSContext context, + INatsJSContext context, Channel> commandChannel, INatsDeserialize serializer, NatsSubOpts? opts, @@ -433,7 +433,7 @@ public NatsJSOrderedPushConsumerSub( : base( connection: context.Connection, manager: context.Connection.SubscriptionManager, - subject: context.NewInbox(), + subject: context.NewBaseInbox(), queueGroup: default, opts) { diff --git a/src/NATS.Client.JetStream/NatsClientExtensions.cs b/src/NATS.Client.JetStream/NatsClientExtensions.cs index 31e4e9809..48793754a 100644 --- a/src/NATS.Client.JetStream/NatsClientExtensions.cs +++ b/src/NATS.Client.JetStream/NatsClientExtensions.cs @@ -4,9 +4,19 @@ namespace NATS.Client.JetStream; public static class NatsClientExtensions { + /// + /// Creates a JetStream context using the provided NATS client. + /// + /// The NATS client used to create the JetStream context. + /// Returns an instance of for interacting with JetStream. public static INatsJSContext CreateJetStreamContext(this INatsClient client) => CreateJetStreamContext(client.Connection); + /// + /// Creates a JetStream context using the provided NATS connection. + /// + /// The NATS connection used to create the JetStream context. + /// Returns an instance of for interacting with JetStream. public static INatsJSContext CreateJetStreamContext(this INatsConnection connection) => new NatsJSContext(connection); } diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 9f21017ec..6d8fd736f 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -284,7 +284,7 @@ internal async ValueTask> ConsumeInternalAsync(INatsDeserial opts ??= new NatsJSConsumeOpts(); serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); - var inbox = _context.NewInbox(); + var inbox = _context.NewBaseInbox(); var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes); var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); @@ -332,7 +332,7 @@ internal async ValueTask> OrderedConsumeInternalAsync ThrowIfDeleted(); serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); - var inbox = _context.NewInbox(); + var inbox = _context.NewBaseInbox(); var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes); var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); @@ -382,7 +382,7 @@ internal async ValueTask> FetchInternalAsync( ThrowIfDeleted(); serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); - var inbox = _context.NewInbox(); + var inbox = _context.NewBaseInbox(); var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes); var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 66ed5cacc..499263472 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -32,7 +32,8 @@ public NatsJSContext(INatsConnection connection, NatsJSOpts opts) public INatsConnection Connection { get; } - internal NatsJSOpts Opts { get; } + /// + public NatsJSOpts Opts { get; } /// /// Calls JetStream Account Info API. @@ -238,6 +239,22 @@ public async ValueTask PublishConcurrentAsync( return new NatsJSPublishConcurrentFuture(sub); } + /// + public string NewBaseInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix); + + /// + public async ValueTask JSRequestResponseAsync( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + { + var response = await JSRequestAsync(subject, request, cancellationToken); + response.EnsureSuccess(); + return response.Response!; + } + internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArgumentExpression("name")] string? paramName = null) { #if NETSTANDARD @@ -262,20 +279,6 @@ internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArg } } - internal string NewInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix); - - internal async ValueTask JSRequestResponseAsync( - string subject, - TRequest? request, - CancellationToken cancellationToken = default) - where TRequest : class - where TResponse : class - { - var response = await JSRequestAsync(subject, request, cancellationToken); - response.EnsureSuccess(); - return response.Response!; - } - internal async ValueTask> JSRequestAsync( string subject, TRequest? request, diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index e5a0bd151..cc6830054 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -140,11 +140,11 @@ public interface INatsJSMsg /// User message type public readonly struct NatsJSMsg : INatsJSMsg { - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly NatsMsg _msg; private readonly Lazy _replyToDateTimeAndSeq; - public NatsJSMsg(NatsMsg msg, NatsJSContext context) + public NatsJSMsg(NatsMsg msg, INatsJSContext context) { _msg = msg; _context = context; diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs index e2f7f1d3b..7b7f59335 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs @@ -7,7 +7,7 @@ namespace NATS.Client.KeyValueStore.Internal; internal class NatsKVWatchSub : NatsSubBase { - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly CancellationToken _cancellationToken; private readonly INatsConnection _nats; private readonly NatsHeaderParser _headerParser; @@ -15,7 +15,7 @@ internal class NatsKVWatchSub : NatsSubBase private readonly ChannelWriter> _commands; public NatsKVWatchSub( - NatsJSContext context, + INatsJSContext context, Channel> commandChannel, INatsDeserialize serializer, NatsSubOpts? opts, @@ -23,7 +23,7 @@ public NatsKVWatchSub( : base( connection: context.Connection, manager: context.Connection.SubscriptionManager, - subject: context.NewInbox(), + subject: context.NewBaseInbox(), queueGroup: default, opts) { diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs index 30f54dd00..ad2812eeb 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs @@ -28,7 +28,7 @@ internal sealed class NatsKVWatcher : IAsyncDisposable { private readonly ILogger _logger; private readonly bool _debug; - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly string _bucket; private readonly INatsDeserialize _serializer; private readonly NatsKVWatchOpts _opts; @@ -53,7 +53,7 @@ internal sealed class NatsKVWatcher : IAsyncDisposable private INatsJSConsumer? _initialConsumer; public NatsKVWatcher( - NatsJSContext context, + INatsJSContext context, string bucket, IEnumerable keys, INatsDeserialize serializer, diff --git a/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs b/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs new file mode 100644 index 000000000..43db7740d --- /dev/null +++ b/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs @@ -0,0 +1,31 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; + +namespace NATS.Client.KeyValueStore; + +public static class NatsClientExtensions +{ + /// + /// Creates a NATS Key-Value Store context using the specified NATS client. + /// + /// The NATS client instance. + /// An instance of which can be used to interact with the Key-Value Store. + public static INatsKVContext CreateKeyValueStoreContext(this INatsClient client) + => CreateKeyValueStoreContext(client.Connection); + + /// + /// Creates a NATS Key-Value Store context using the specified NATS connection. + /// + /// The NATS connection instance. + /// An instance of which can be used to interact with the Key-Value Store. + public static INatsKVContext CreateKeyValueStoreContext(this INatsConnection connection) + => CreateKeyValueStoreContext(new NatsJSContext(connection)); + + /// + /// Creates a NATS Key-Value Store context using the specified NATS JetStream context. + /// + /// The NATS JetStream context instance. + /// An instance of which can be used to interact with the Key-Value Store. + public static INatsKVContext CreateKeyValueStoreContext(this INatsJSContext context) + => new NatsKVContext(context); +} diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index 6c4279739..22709c7d3 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -23,13 +23,13 @@ public class NatsKVContext : INatsKVContext private static readonly int KvStreamNamePrefixLen = KvStreamNamePrefix.Length; private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled); - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; /// /// Create a new Key Value Store context /// /// JetStream context - public NatsKVContext(NatsJSContext context) => _context = context; + public NatsKVContext(INatsJSContext context) => _context = context; /// /// Create a new Key Value Store or get an existing one diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index d42c9b6f2..a33fd6e2f 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -44,10 +44,10 @@ public class NatsKVStore : INatsKVStore private const string NatsSequence = "Nats-Sequence"; private const string NatsTimeStamp = "Nats-Time-Stamp"; private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled); - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly INatsJSStream _stream; - internal NatsKVStore(string bucket, NatsJSContext context, INatsJSStream stream) + internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream) { Bucket = bucket; _context = context; diff --git a/src/NATS.Client.ObjectStore/NatsClientExtensions.cs b/src/NATS.Client.ObjectStore/NatsClientExtensions.cs new file mode 100644 index 000000000..7da625b10 --- /dev/null +++ b/src/NATS.Client.ObjectStore/NatsClientExtensions.cs @@ -0,0 +1,31 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; + +namespace NATS.Client.ObjectStore; + +public static class NatsClientExtensions +{ + /// + /// Creates a NATS Object Store context for the given NATS client. + /// + /// The NATS client instance. + /// An instance of used for interacting with the NATS Object Store. + public static INatsObjContext CreateObjectStoreContext(this INatsClient client) + => CreateObjectStoreContext(client.Connection); + + /// + /// Creates a NATS Object Store context for the given NATS connection. + /// + /// The NATS connection instance. + /// An instance of used for interacting with the NATS Object Store. + public static INatsObjContext CreateObjectStoreContext(this INatsConnection connection) + => CreateObjectStoreContext(new NatsJSContext(connection)); + + /// + /// Creates a NATS Object Store context for the given NATS JetStream context. + /// + /// The NATS JetStream context instance. + /// An instance of used for interacting with the NATS Object Store. + public static INatsObjContext CreateObjectStoreContext(this INatsJSContext context) + => new NatsObjContext(context); +} diff --git a/src/NATS.Client.ObjectStore/NatsObjContext.cs b/src/NATS.Client.ObjectStore/NatsObjContext.cs index c2afb850f..ac782ccb5 100644 --- a/src/NATS.Client.ObjectStore/NatsObjContext.cs +++ b/src/NATS.Client.ObjectStore/NatsObjContext.cs @@ -12,13 +12,13 @@ public class NatsObjContext : INatsObjContext { private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled); - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; /// /// Create a new object store context. /// /// JetStream context. - public NatsObjContext(NatsJSContext context) => _context = context; + public NatsObjContext(INatsJSContext context) => _context = context; /// /// Create a new object store. diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 5e6c8006a..869b4fe45 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -28,10 +28,10 @@ public class NatsObjStore : INatsObjStore private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } }; private readonly NatsObjContext _objContext; - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly INatsJSStream _stream; - internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSContext context, INatsJSStream stream) + internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, INatsJSContext context, INatsJSStream stream) { Bucket = config.Bucket; _objContext = objContext; diff --git a/src/NATS.Client.Services/Internal/SvcListener.cs b/src/NATS.Client.Services/Internal/SvcListener.cs index 6b3edb220..72ce5fd16 100644 --- a/src/NATS.Client.Services/Internal/SvcListener.cs +++ b/src/NATS.Client.Services/Internal/SvcListener.cs @@ -7,7 +7,7 @@ namespace NATS.Client.Services.Internal; internal class SvcListener : IAsyncDisposable { private readonly ILogger _logger; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly Channel _channel; private readonly SvcMsgType _type; private readonly string _subject; @@ -16,7 +16,7 @@ internal class SvcListener : IAsyncDisposable private INatsSub>? _sub; private Task? _readLoop; - public SvcListener(ILogger logger, NatsConnection nats, Channel channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken) + public SvcListener(ILogger logger, INatsConnection nats, Channel channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken) { _logger = logger; _nats = nats; diff --git a/src/NATS.Client.Services/NatsClientExtensions.cs b/src/NATS.Client.Services/NatsClientExtensions.cs new file mode 100644 index 000000000..ac6301736 --- /dev/null +++ b/src/NATS.Client.Services/NatsClientExtensions.cs @@ -0,0 +1,22 @@ +using NATS.Client.Core; + +namespace NATS.Client.Services; + +public static class NatsClientExtensions +{ + /// + /// Creates a NATS Services context for the given NATS client. + /// + /// The NATS client for which to create the services context. + /// An instance of used for interacting with the NATS Services. + public static INatsSvcContext CreateServicesContext(this INatsClient client) + => CreateServicesContext(client.Connection); + + /// + /// Creates a NATS Services context for the given NATS connection. + /// + /// The NATS connection for which to create the services context. + /// An instance of used for interacting with the NATS Services. + public static INatsSvcContext CreateServicesContext(this INatsConnection connection) + => new NatsSvcContext(connection); +} diff --git a/src/NATS.Client.Services/NatsSvcContext.cs b/src/NATS.Client.Services/NatsSvcContext.cs index cfcfb7eb0..42f78d70e 100644 --- a/src/NATS.Client.Services/NatsSvcContext.cs +++ b/src/NATS.Client.Services/NatsSvcContext.cs @@ -7,13 +7,13 @@ namespace NATS.Client.Services; /// public class NatsSvcContext : INatsSvcContext { - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; /// /// Creates a new instance of . /// /// NATS connection. - public NatsSvcContext(NatsConnection nats) => _nats = nats; + public NatsSvcContext(INatsConnection nats) => _nats = nats; /// /// Adds a new service. diff --git a/src/NATS.Client.Services/NatsSvcEndPoint.cs b/src/NATS.Client.Services/NatsSvcEndPoint.cs index 8e6947c2f..6db64067a 100644 --- a/src/NATS.Client.Services/NatsSvcEndPoint.cs +++ b/src/NATS.Client.Services/NatsSvcEndPoint.cs @@ -69,7 +69,7 @@ public interface INatsSvcEndpoint : IAsyncDisposable /// public abstract class NatsSvcEndpointBase : NatsSubBase, INatsSvcEndpoint { - protected NatsSvcEndpointBase(NatsConnection connection, string subject, string? queueGroup, NatsSubOpts? opts) + protected NatsSvcEndpointBase(INatsConnection connection, string subject, string? queueGroup, NatsSubOpts? opts) : base(connection, connection.SubscriptionManager, subject, queueGroup, opts) { } @@ -108,7 +108,7 @@ public class NatsSvcEndpoint : NatsSvcEndpointBase { private readonly ILogger _logger; private readonly Func, ValueTask> _handler; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly CancellationToken _cancellationToken; private readonly Channel> _channel; private readonly INatsDeserialize _serializer; @@ -131,7 +131,7 @@ public class NatsSvcEndpoint : NatsSvcEndpointBase /// Serializer to use for the message type. /// Subscription options. /// A used to cancel the API call. - public NatsSvcEndpoint(NatsConnection nats, string? queueGroup, string name, Func, ValueTask> handler, string subject, IDictionary? metadata, INatsDeserialize serializer, NatsSubOpts? opts, CancellationToken cancellationToken) + public NatsSvcEndpoint(INatsConnection nats, string? queueGroup, string name, Func, ValueTask> handler, string subject, IDictionary? metadata, INatsDeserialize serializer, NatsSubOpts? opts, CancellationToken cancellationToken) : base(nats, subject, queueGroup, opts) { _logger = nats.Opts.LoggerFactory.CreateLogger>(); diff --git a/src/NATS.Client.Services/NatsSvcServer.cs b/src/NATS.Client.Services/NatsSvcServer.cs index 08f4379b4..1b30c21c8 100644 --- a/src/NATS.Client.Services/NatsSvcServer.cs +++ b/src/NATS.Client.Services/NatsSvcServer.cs @@ -16,7 +16,7 @@ public class NatsSvcServer : INatsSvcServer { private readonly ILogger _logger; private readonly string _id; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly NatsSvcConfig _config; private readonly Channel _channel; private readonly Task _taskMsgLoop; @@ -31,7 +31,7 @@ public class NatsSvcServer : INatsSvcServer /// NATS connection. /// Service configuration. /// A used to cancel the service creation requests. - public NatsSvcServer(NatsConnection nats, NatsSvcConfig config, CancellationToken cancellationToken) + public NatsSvcServer(INatsConnection nats, NatsSvcConfig config, CancellationToken cancellationToken) { _logger = nats.Opts.LoggerFactory.CreateLogger(); _id = Nuid.NewNuid(); diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs index 0270ae9f9..a272f2760 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs @@ -49,6 +49,7 @@ public void Create_Context_WithMockConnection_Test() public class MockJsContext : INatsJSContext { public INatsConnection Connection { get; } = new NatsConnection(); + public NatsJSOpts Opts { get; } public ValueTask CreateOrderedConsumerAsync(string stream, NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); @@ -91,5 +92,9 @@ public class MockJsContext : INatsJSContext public IAsyncEnumerable ListStreamNamesAsync(string? subject = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask PublishConcurrentAsync(string subject, T? data, INatsSerialize? serializer = default, NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + + public string NewBaseInbox() => throw new NotImplementedException(); + + public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class => throw new NotImplementedException(); } } diff --git a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs index 2ed5bb6f8..0a59e9e1b 100644 --- a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs @@ -49,6 +49,7 @@ public void Create_Context_WithMockConnection_Test() public class MockJsContext : INatsJSContext { public INatsConnection Connection { get; } = new NatsConnection(); + public NatsJSOpts Opts { get; } public ValueTask CreateOrderedConsumerAsync(string stream, NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); @@ -91,5 +92,9 @@ public class MockJsContext : INatsJSContext public IAsyncEnumerable ListStreamNamesAsync(string? subject = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask PublishConcurrentAsync(string subject, T? data, INatsSerialize? serializer = default, NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + + public string NewBaseInbox() => throw new NotImplementedException(); + + public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class => throw new NotImplementedException(); } } From 98fd316713600d6e4a3e485ca80e000a3929130c Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 23 Sep 2024 16:58:21 +0100 Subject: [PATCH 2/4] Add public `Context` properties to store interfaces Updated INatsObjStore, INatsKVContext, and INatsSvcContext interfaces to include public `Context` properties. This change ensures consistent access to the underlying context objects across various components. --- .../INatsKVContext.cs | 5 ++++ .../NatsKVContext.cs | 27 ++++++++++--------- src/NATS.Client.ObjectStore/INatsObjStore.cs | 5 ++++ src/NATS.Client.ObjectStore/NatsObjStore.cs | 26 +++++++++--------- src/NATS.Client.Services/INatsSvcContext.cs | 7 +++++ src/NATS.Client.Services/NatsSvcContext.cs | 9 ++++--- 6 files changed, 50 insertions(+), 29 deletions(-) diff --git a/src/NATS.Client.KeyValueStore/INatsKVContext.cs b/src/NATS.Client.KeyValueStore/INatsKVContext.cs index 2c00bf951..1ad134012 100644 --- a/src/NATS.Client.KeyValueStore/INatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/INatsKVContext.cs @@ -4,6 +4,11 @@ namespace NATS.Client.KeyValueStore; public interface INatsKVContext { + /// + /// Provides access to the JetStream context associated with the Key-Value Store operations. + /// + INatsJSContext Context { get; } + /// /// Create a new Key Value Store or get an existing one /// diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index 22709c7d3..3918c385f 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -23,13 +23,14 @@ public class NatsKVContext : INatsKVContext private static readonly int KvStreamNamePrefixLen = KvStreamNamePrefix.Length; private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled); - private readonly INatsJSContext _context; - /// /// Create a new Key Value Store context /// /// JetStream context - public NatsKVContext(INatsJSContext context) => _context = context; + public NatsKVContext(INatsJSContext context) => Context = context; + + /// + public INatsJSContext Context { get; } /// /// Create a new Key Value Store or get an existing one @@ -57,9 +58,9 @@ public async ValueTask CreateStoreAsync(NatsKVConfig config, Cance var streamConfig = NatsKVContext.CreateStreamConfig(config); - var stream = await _context.CreateStreamAsync(streamConfig, cancellationToken); + var stream = await Context.CreateStreamAsync(streamConfig, cancellationToken); - return new NatsKVStore(config.Bucket, _context, stream); + return new NatsKVStore(config.Bucket, Context, stream); } /// @@ -75,7 +76,7 @@ public async ValueTask GetStoreAsync(string bucket, CancellationTo { ValidateBucketName(bucket); - var stream = await _context.GetStreamAsync(BucketToStream(bucket), cancellationToken: cancellationToken); + var stream = await Context.GetStreamAsync(BucketToStream(bucket), cancellationToken: cancellationToken); if (stream.Info.Config.MaxMsgsPerSubject < 1) { @@ -83,7 +84,7 @@ public async ValueTask GetStoreAsync(string bucket, CancellationTo } // TODO: KV mirror - return new NatsKVStore(bucket, _context, stream); + return new NatsKVStore(bucket, Context, stream); } /// @@ -101,9 +102,9 @@ public async ValueTask UpdateStoreAsync(NatsKVConfig config, Cance var streamConfig = NatsKVContext.CreateStreamConfig(config); - var stream = await _context.UpdateStreamAsync(streamConfig, cancellationToken); + var stream = await Context.UpdateStreamAsync(streamConfig, cancellationToken); - return new NatsKVStore(config.Bucket, _context, stream); + return new NatsKVStore(config.Bucket, Context, stream); } /// @@ -117,7 +118,7 @@ public async ValueTask UpdateStoreAsync(NatsKVConfig config, Cance public ValueTask DeleteStoreAsync(string bucket, CancellationToken cancellationToken = default) { ValidateBucketName(bucket); - return _context.DeleteStreamAsync(BucketToStream(bucket), cancellationToken); + return Context.DeleteStreamAsync(BucketToStream(bucket), cancellationToken); } /// @@ -129,7 +130,7 @@ public ValueTask DeleteStoreAsync(string bucket, CancellationToken cancell /// Server responded with an error. public async IAsyncEnumerable GetBucketNamesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - await foreach (var name in _context.ListStreamNamesAsync(cancellationToken: cancellationToken)) + await foreach (var name in Context.ListStreamNamesAsync(cancellationToken: cancellationToken)) { if (!name.StartsWith(KvStreamNamePrefix)) { @@ -149,9 +150,9 @@ public async IAsyncEnumerable GetBucketNamesAsync([EnumeratorCancellatio /// Server responded with an error. public async IAsyncEnumerable GetStatusesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - await foreach (var name in _context.ListStreamNamesAsync(cancellationToken: cancellationToken)) + await foreach (var name in Context.ListStreamNamesAsync(cancellationToken: cancellationToken)) { - var stream = await _context.GetStreamAsync(name, cancellationToken: cancellationToken); + var stream = await Context.GetStreamAsync(name, cancellationToken: cancellationToken); var isCompressed = stream.Info.Config.Compression != StreamConfigCompression.None; yield return new NatsKVStatus(name, isCompressed, stream.Info); } diff --git a/src/NATS.Client.ObjectStore/INatsObjStore.cs b/src/NATS.Client.ObjectStore/INatsObjStore.cs index 255a5618b..8a0b0f288 100644 --- a/src/NATS.Client.ObjectStore/INatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/INatsObjStore.cs @@ -8,6 +8,11 @@ namespace NATS.Client.ObjectStore; /// public interface INatsObjStore { + /// + /// Provides access to the JetStream context associated with the Object Store operations. + /// + INatsJSContext Context { get; } + /// /// Object store bucket name. /// diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 869b4fe45..c09cf245b 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -28,17 +28,19 @@ public class NatsObjStore : INatsObjStore private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } }; private readonly NatsObjContext _objContext; - private readonly INatsJSContext _context; private readonly INatsJSStream _stream; internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, INatsJSContext context, INatsJSStream stream) { Bucket = config.Bucket; _objContext = objContext; - _context = context; + Context = context; _stream = stream; } + /// + public INatsJSContext Context { get; } + /// /// Object store bucket name. /// @@ -84,7 +86,7 @@ public async ValueTask GetAsync(string key, Stream stream, bool } await using var pushConsumer = new NatsJSOrderedPushConsumer>( - context: _context, + context: Context, stream: $"OBJ_{Bucket}", filter: GetChunkSubject(info.Nuid), serializer: NatsDefaultSerializer>.Default, @@ -294,7 +296,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre var buffer = memoryOwner.Slice(0, currentChunkSize); // Chunks - var ack = await _context.PublishAsync(GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); + var ack = await Context.PublishAsync(GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); ack.EnsureSuccess(); if (eof) @@ -320,8 +322,8 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre { try { - await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{Bucket}", + await Context.JSRequestResponseAsync( + subject: $"{Context.Opts.Prefix}.STREAM.PURGE.OBJ_{Bucket}", request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid), @@ -495,16 +497,16 @@ public async ValueTask AddBucketLinkAsync(string link, INatsObjS /// Update operation failed public async ValueTask SealAsync(CancellationToken cancellationToken = default) { - var info = await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.STREAM.INFO.{_stream.Info.Config.Name}", + var info = await Context.JSRequestResponseAsync( + subject: $"{Context.Opts.Prefix}.STREAM.INFO.{_stream.Info.Config.Name}", request: null, cancellationToken).ConfigureAwait(false); var config = info.Config; config.Sealed = true; - var response = await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.STREAM.UPDATE.{_stream.Info.Config.Name}", + var response = await Context.JSRequestResponseAsync( + subject: $"{Context.Opts.Prefix}.STREAM.UPDATE.{_stream.Info.Config.Name}", request: config, cancellationToken); @@ -616,7 +618,7 @@ public async IAsyncEnumerable WatchAsync(NatsObjWatchOpts? opts } await using var pushConsumer = new NatsJSOrderedPushConsumer>( - context: _context, + context: Context, stream: $"OBJ_{Bucket}", filter: $"$O.{Bucket}.M.>", serializer: NatsDefaultSerializer>.Default, @@ -696,7 +698,7 @@ public async ValueTask DeleteAsync(string key, CancellationToken cancellationTok private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken) { - var ack = await _context.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken); + var ack = await Context.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken); ack.EnsureSuccess(); } diff --git a/src/NATS.Client.Services/INatsSvcContext.cs b/src/NATS.Client.Services/INatsSvcContext.cs index b06513dcf..1e8a683fa 100644 --- a/src/NATS.Client.Services/INatsSvcContext.cs +++ b/src/NATS.Client.Services/INatsSvcContext.cs @@ -1,3 +1,5 @@ +using NATS.Client.Core; + namespace NATS.Client.Services; /// @@ -5,6 +7,11 @@ namespace NATS.Client.Services; /// public interface INatsSvcContext { + /// + /// Gets the associated NATS connection. + /// + INatsConnection Connection { get; } + /// /// Adds a new service. /// diff --git a/src/NATS.Client.Services/NatsSvcContext.cs b/src/NATS.Client.Services/NatsSvcContext.cs index 42f78d70e..9a450c31e 100644 --- a/src/NATS.Client.Services/NatsSvcContext.cs +++ b/src/NATS.Client.Services/NatsSvcContext.cs @@ -7,13 +7,14 @@ namespace NATS.Client.Services; /// public class NatsSvcContext : INatsSvcContext { - private readonly INatsConnection _nats; - /// /// Creates a new instance of . /// /// NATS connection. - public NatsSvcContext(INatsConnection nats) => _nats = nats; + public NatsSvcContext(INatsConnection nats) => Connection = nats; + + /// + public INatsConnection Connection { get; } /// /// Adds a new service. @@ -34,7 +35,7 @@ public ValueTask AddServiceAsync(string name, string version, st /// NATS Service instance. public async ValueTask AddServiceAsync(NatsSvcConfig config, CancellationToken cancellationToken = default) { - var service = new NatsSvcServer(_nats, config, cancellationToken); + var service = new NatsSvcServer(Connection, config, cancellationToken); await service.StartAsync().ConfigureAwait(false); return service; } From f0bd695e9100b77a9daf3bc8e7155a391604a291 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 23 Sep 2024 17:28:06 +0100 Subject: [PATCH 3/4] dotnet format --- src/NATS.Client.KeyValueStore/NatsClientExtensions.cs | 2 +- src/NATS.Client.ObjectStore/NatsClientExtensions.cs | 2 +- src/NATS.Client.Services/NatsClientExtensions.cs | 2 +- .../NatsKVContextFactoryTest.cs | 6 +++++- .../NatsObjContextFactoryTest.cs | 6 +++++- 5 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs b/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs index 43db7740d..55a44919e 100644 --- a/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs +++ b/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs @@ -1,4 +1,4 @@ -using NATS.Client.Core; +using NATS.Client.Core; using NATS.Client.JetStream; namespace NATS.Client.KeyValueStore; diff --git a/src/NATS.Client.ObjectStore/NatsClientExtensions.cs b/src/NATS.Client.ObjectStore/NatsClientExtensions.cs index 7da625b10..b26aa707f 100644 --- a/src/NATS.Client.ObjectStore/NatsClientExtensions.cs +++ b/src/NATS.Client.ObjectStore/NatsClientExtensions.cs @@ -1,4 +1,4 @@ -using NATS.Client.Core; +using NATS.Client.Core; using NATS.Client.JetStream; namespace NATS.Client.ObjectStore; diff --git a/src/NATS.Client.Services/NatsClientExtensions.cs b/src/NATS.Client.Services/NatsClientExtensions.cs index ac6301736..f6699bc03 100644 --- a/src/NATS.Client.Services/NatsClientExtensions.cs +++ b/src/NATS.Client.Services/NatsClientExtensions.cs @@ -1,4 +1,4 @@ -using NATS.Client.Core; +using NATS.Client.Core; namespace NATS.Client.Services; diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs index a272f2760..c1252a1e9 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs @@ -49,6 +49,7 @@ public void Create_Context_WithMockConnection_Test() public class MockJsContext : INatsJSContext { public INatsConnection Connection { get; } = new NatsConnection(); + public NatsJSOpts Opts { get; } public ValueTask CreateOrderedConsumerAsync(string stream, NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); @@ -95,6 +96,9 @@ public class MockJsContext : INatsJSContext public string NewBaseInbox() => throw new NotImplementedException(); - public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class => throw new NotImplementedException(); + public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + => throw new NotImplementedException(); } } diff --git a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs index 0a59e9e1b..0348c9bd5 100644 --- a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs @@ -49,6 +49,7 @@ public void Create_Context_WithMockConnection_Test() public class MockJsContext : INatsJSContext { public INatsConnection Connection { get; } = new NatsConnection(); + public NatsJSOpts Opts { get; } public ValueTask CreateOrderedConsumerAsync(string stream, NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); @@ -95,6 +96,9 @@ public class MockJsContext : INatsJSContext public string NewBaseInbox() => throw new NotImplementedException(); - public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class => throw new NotImplementedException(); + public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + => throw new NotImplementedException(); } } From 07692b67dafa93a149e467119f78a36152ceab79 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 24 Sep 2024 08:57:11 +0100 Subject: [PATCH 4/4] Refactor context creation to use JetStream directly --- src/NATS.Client.KeyValueStore/NatsClientExtensions.cs | 4 ++-- src/NATS.Client.ObjectStore/NatsClientExtensions.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs b/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs index 55a44919e..80ef31708 100644 --- a/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs +++ b/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs @@ -11,7 +11,7 @@ public static class NatsClientExtensions /// The NATS client instance. /// An instance of which can be used to interact with the Key-Value Store. public static INatsKVContext CreateKeyValueStoreContext(this INatsClient client) - => CreateKeyValueStoreContext(client.Connection); + => CreateKeyValueStoreContext(client.CreateJetStreamContext()); /// /// Creates a NATS Key-Value Store context using the specified NATS connection. @@ -19,7 +19,7 @@ public static INatsKVContext CreateKeyValueStoreContext(this INatsClient client) /// The NATS connection instance. /// An instance of which can be used to interact with the Key-Value Store. public static INatsKVContext CreateKeyValueStoreContext(this INatsConnection connection) - => CreateKeyValueStoreContext(new NatsJSContext(connection)); + => CreateKeyValueStoreContext(connection.CreateJetStreamContext()); /// /// Creates a NATS Key-Value Store context using the specified NATS JetStream context. diff --git a/src/NATS.Client.ObjectStore/NatsClientExtensions.cs b/src/NATS.Client.ObjectStore/NatsClientExtensions.cs index b26aa707f..cc82c2bc0 100644 --- a/src/NATS.Client.ObjectStore/NatsClientExtensions.cs +++ b/src/NATS.Client.ObjectStore/NatsClientExtensions.cs @@ -11,7 +11,7 @@ public static class NatsClientExtensions /// The NATS client instance. /// An instance of used for interacting with the NATS Object Store. public static INatsObjContext CreateObjectStoreContext(this INatsClient client) - => CreateObjectStoreContext(client.Connection); + => CreateObjectStoreContext(client.CreateJetStreamContext()); /// /// Creates a NATS Object Store context for the given NATS connection. @@ -19,7 +19,7 @@ public static INatsObjContext CreateObjectStoreContext(this INatsClient client) /// The NATS connection instance. /// An instance of used for interacting with the NATS Object Store. public static INatsObjContext CreateObjectStoreContext(this INatsConnection connection) - => CreateObjectStoreContext(new NatsJSContext(connection)); + => CreateObjectStoreContext(connection.CreateJetStreamContext()); /// /// Creates a NATS Object Store context for the given NATS JetStream context.