diff --git a/sandbox/BlazorWasm/Server/NatsServices/WeatherForecastService.cs b/sandbox/BlazorWasm/Server/NatsServices/WeatherForecastService.cs index bd181f099..2b77a7612 100644 --- a/sandbox/BlazorWasm/Server/NatsServices/WeatherForecastService.cs +++ b/sandbox/BlazorWasm/Server/NatsServices/WeatherForecastService.cs @@ -11,18 +11,18 @@ public class WeatherForecastService : IHostedService, IAsyncDisposable }; private readonly ILogger _logger; - private readonly INatsCommand _natsCommand; + private readonly INatsConnection _natsConnection; private NatsReplyHandle? _replyHandle; - public WeatherForecastService(ILogger logger, INatsCommand natsCommand) + public WeatherForecastService(ILogger logger, INatsConnection natsConnection) { _logger = logger; - _natsCommand = natsCommand; + _natsConnection = natsConnection; } public async Task StartAsync(CancellationToken cancellationToken) { - _replyHandle = await _natsCommand.ReplyAsync("weather", req => + _replyHandle = await _natsConnection.ReplyAsync("weather", req => { return Enumerable.Range(1, 5).Select(index => new WeatherForecast { diff --git a/sandbox/ConsoleApp/Program.cs b/sandbox/ConsoleApp/Program.cs index 9baabfec5..0bf7e343f 100644 --- a/sandbox/ConsoleApp/Program.cs +++ b/sandbox/ConsoleApp/Program.cs @@ -64,17 +64,17 @@ public record Person(int Age, string Name); public class Runner : ConsoleAppBase { - private readonly INatsCommand _command; + private readonly INatsConnection _connection; - public Runner(INatsCommand command) + public Runner(INatsConnection connection) { - _command = command; + _connection = connection; } [RootCommand] public async Task Run() { - var subscription = await _command.SubscribeAsync("foo"); + var subscription = await _connection.SubscribeAsync("foo"); _ = Task.Run(async () => { @@ -84,8 +84,8 @@ public async Task Run() } }); - await _command.PingAsync(); - await _command.PublishAsync("foo"); + await _connection.PingAsync(); + await _connection.PublishAsync("foo"); } } diff --git a/sandbox/MinimumWebApp/Program.cs b/sandbox/MinimumWebApp/Program.cs index 818fe5dbd..e62c9e059 100644 --- a/sandbox/MinimumWebApp/Program.cs +++ b/sandbox/MinimumWebApp/Program.cs @@ -9,7 +9,7 @@ var app = builder.Build(); -app.MapGet("/subscribe", async (INatsCommand command) => +app.MapGet("/subscribe", async (INatsConnection command) => { var subscription = await command.SubscribeAsync("foo"); @@ -22,6 +22,6 @@ }); }); -app.MapGet("/publish", async (INatsCommand command) => await command.PublishAsync("foo", 99)); +app.MapGet("/publish", async (INatsConnection command) => await command.PublishAsync("foo", 99)); app.Run(); diff --git a/src/NATS.Client.Core/Commands/FlushCommand.cs b/src/NATS.Client.Core/Commands/FlushCommand.cs deleted file mode 100644 index b89b0cf07..000000000 --- a/src/NATS.Client.Core/Commands/FlushCommand.cs +++ /dev/null @@ -1,30 +0,0 @@ -using NATS.Client.Core.Internal; - -namespace NATS.Client.Core.Commands; - -internal sealed class AsyncFlushCommand : AsyncCommandBase -{ - private AsyncFlushCommand() - { - } - - public static AsyncFlushCommand Create(ObjectPool pool, CancellationTimer timer) - { - if (!TryRent(pool, out var result)) - { - result = new AsyncFlushCommand(); - } - - result.SetCancellationTimer(timer); - - return result; - } - - public override void Write(ProtocolWriter writer) - { - } - - protected override void Reset() - { - } -} diff --git a/src/NATS.Client.Core/Internal/HeaderParser.cs b/src/NATS.Client.Core/HeaderParser.cs similarity index 99% rename from src/NATS.Client.Core/Internal/HeaderParser.cs rename to src/NATS.Client.Core/HeaderParser.cs index 27afc3f7f..f908a04d4 100644 --- a/src/NATS.Client.Core/Internal/HeaderParser.cs +++ b/src/NATS.Client.Core/HeaderParser.cs @@ -4,10 +4,11 @@ using System.Diagnostics; using System.Text; using Microsoft.Extensions.Primitives; +using NATS.Client.Core.Internal; -namespace NATS.Client.Core.Internal; +namespace NATS.Client.Core; -internal class HeaderParser +public class HeaderParser { private const byte ByteCR = (byte)'\r'; private const byte ByteLF = (byte)'\n'; diff --git a/src/NATS.Client.Core/INatsCommand.cs b/src/NATS.Client.Core/INatsConnection.cs similarity index 96% rename from src/NATS.Client.Core/INatsCommand.cs rename to src/NATS.Client.Core/INatsConnection.cs index adb1a5543..c064d64d7 100644 --- a/src/NATS.Client.Core/INatsCommand.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -2,10 +2,8 @@ namespace NATS.Client.Core; -public interface INatsCommand +public interface INatsConnection { - ValueTask FlushAsync(CancellationToken cancellationToken = default); - /// /// Send PING command and await PONG. Return value is similar as Round Trip Time (RTT). /// @@ -69,6 +67,4 @@ public interface INatsCommand /// Specifies the type of data that may be received from the NATS Server. /// A that represents the asynchronous send operation. ValueTask> SubscribeAsync(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default); - - IObservable AsObservable(string subject); } diff --git a/src/NATS.Client.Core/INatsSub.cs b/src/NATS.Client.Core/INatsSub.cs new file mode 100644 index 000000000..3546a6500 --- /dev/null +++ b/src/NATS.Client.Core/INatsSub.cs @@ -0,0 +1,49 @@ +using System.Buffers; +using System.Collections.Concurrent; + +namespace NATS.Client.Core; + +internal interface INatsSub : IAsyncDisposable +{ + string Subject { get; } + + string? QueueGroup { get; } + + int Sid { get; } + + ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer); +} + +internal interface INatsSubBuilder + where T : INatsSub +{ + T Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager); +} + +internal class NatsSubBuilder : INatsSubBuilder +{ + public static readonly NatsSubBuilder Default = new(); + + public NatsSub Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager) + { + var sid = manager.GetNextSid(); + return new NatsSub(connection, manager, subject, queueGroup, sid); + } +} + +internal class NatsSubModelBuilder : INatsSubBuilder> +{ + private static readonly ConcurrentDictionary> Cache = new(); + private readonly INatsSerializer _serializer; + + public NatsSubModelBuilder(INatsSerializer serializer) => _serializer = serializer; + + public static NatsSubModelBuilder For(INatsSerializer serializer) => + Cache.GetOrAdd(serializer, static s => new NatsSubModelBuilder(s)); + + public NatsSub Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager) + { + var sid = manager.GetNextSid(); + return new NatsSub(connection, manager, subject, queueGroup, sid, _serializer); + } +} diff --git a/src/NATS.Client.Core/NatsPipeliningWriteProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs similarity index 99% rename from src/NATS.Client.Core/NatsPipeliningWriteProtocolProcessor.cs rename to src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs index 3cd192d8a..fba637149 100644 --- a/src/NATS.Client.Core/NatsPipeliningWriteProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs @@ -2,9 +2,8 @@ using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Client.Core.Commands; -using NATS.Client.Core.Internal; -namespace NATS.Client.Core; +namespace NATS.Client.Core.Internal; internal sealed class NatsPipeliningWriteProtocolProcessor : IAsyncDisposable { diff --git a/src/NATS.Client.Core/NatsReadProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs similarity index 99% rename from src/NATS.Client.Core/NatsReadProtocolProcessor.cs rename to src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs index a2a788caf..524dd6d57 100644 --- a/src/NATS.Client.Core/NatsReadProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs @@ -8,9 +8,8 @@ using System.Text.Json; using Microsoft.Extensions.Logging; using NATS.Client.Core.Commands; -using NATS.Client.Core.Internal; -namespace NATS.Client.Core; +namespace NATS.Client.Core.Internal; internal sealed class NatsReadProtocolProcessor : IAsyncDisposable { diff --git a/src/NATS.Client.Core/NatsUri.cs b/src/NATS.Client.Core/Internal/NatsUri.cs similarity index 97% rename from src/NATS.Client.Core/NatsUri.cs rename to src/NATS.Client.Core/Internal/NatsUri.cs index 092f2e544..f792b8089 100644 --- a/src/NATS.Client.Core/NatsUri.cs +++ b/src/NATS.Client.Core/Internal/NatsUri.cs @@ -1,4 +1,4 @@ -namespace NATS.Client.Core; +namespace NATS.Client.Core.Internal; internal sealed class NatsUri : IEquatable { diff --git a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs new file mode 100644 index 000000000..2192085f0 --- /dev/null +++ b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs @@ -0,0 +1,76 @@ +using System.Buffers; +using NATS.Client.Core.Commands; + +namespace NATS.Client.Core; + +public partial class NatsConnection +{ + internal ValueTask PubAsync(string subject, string? replyTo = default, ReadOnlySequence payload = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) + { + headers?.SetReadOnly(); + + if (ConnectionState == NatsConnectionState.Open) + { + var command = AsyncPublishBytesCommand.Create(_pool, GetCommandTimer(cancellationToken), subject, replyTo, headers, payload); + if (TryEnqueueCommand(command)) + { + return command.AsValueTask(); + } + else + { + return EnqueueAndAwaitCommandAsync(command); + } + } + else + { + return WithConnectAsync(subject, replyTo, headers, payload, cancellationToken, static (self, s, r, h, p, token) => + { + var command = AsyncPublishBytesCommand.Create(self._pool, self.GetCommandTimer(token), s, r, h, p); + return self.EnqueueAndAwaitCommandAsync(command); + }); + } + } + + internal ValueTask PubModelAsync(string subject, T data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) + { + headers?.SetReadOnly(); + + if (ConnectionState == NatsConnectionState.Open) + { + var command = AsyncPublishCommand.Create(_pool, GetCommandTimer(cancellationToken), subject, replyTo, headers, data, serializer); + if (TryEnqueueCommand(command)) + { + return command.AsValueTask(); + } + else + { + return EnqueueAndAwaitCommandAsync(command); + } + } + else + { + return WithConnectAsync(subject, replyTo, headers, data, serializer, cancellationToken, static (self, s, r, h, v, ser, token) => + { + var command = AsyncPublishCommand.Create(self._pool, self.GetCommandTimer(token), s, r, h, v, ser); + return self.EnqueueAndAwaitCommandAsync(command); + }); + } + } + + internal ValueTask SubAsync(string subject, string? queueGroup, INatsSubBuilder builder, CancellationToken cancellationToken = default) + where T : INatsSub + { + var sub = builder.Build(subject, queueGroup, this, _subscriptionManager); + if (ConnectionState == NatsConnectionState.Open) + { + return _subscriptionManager.SubscribeAsync(subject, queueGroup, sub, cancellationToken); + } + else + { + return WithConnectAsync(subject, queueGroup, sub, cancellationToken, static (self, s, qg, sb, token) => + { + return self._subscriptionManager.SubscribeAsync(s, qg, sb, token); + }); + } + } +} diff --git a/src/NATS.Client.Core/NatsConnection.Other.cs b/src/NATS.Client.Core/NatsConnection.Other.cs deleted file mode 100644 index a130a8887..000000000 --- a/src/NATS.Client.Core/NatsConnection.Other.cs +++ /dev/null @@ -1,35 +0,0 @@ -using NATS.Client.Core.Commands; - -namespace NATS.Client.Core; - -public partial class NatsConnection -{ - public IObservable AsObservable(string subject) - { - return new NatsObservable(this, subject); - } - - public ValueTask FlushAsync(CancellationToken cancellationToken = default) - { - if (ConnectionState == NatsConnectionState.Open) - { - var command = AsyncFlushCommand.Create(_pool, GetCommandTimer(cancellationToken)); - if (TryEnqueueCommand(command)) - { - return command.AsValueTask(); - } - else - { - return EnqueueAndAwaitCommandAsync(command); - } - } - else - { - return WithConnectAsync(cancellationToken, static (self, token) => - { - var command = AsyncFlushCommand.Create(self._pool, self.GetCommandTimer(token)); - return self.EnqueueAndAwaitCommandAsync(command); - }); - } - } -} diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index 07bf1f959..a41ebbc6c 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -1,5 +1,4 @@ using System.Buffers; -using NATS.Client.Core.Commands; namespace NATS.Client.Core; @@ -8,31 +7,7 @@ public partial class NatsConnection /// public ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { - var replyTo = opts?.ReplyTo; - var headers = opts?.Headers; - - headers?.SetReadOnly(); - - if (ConnectionState == NatsConnectionState.Open) - { - var command = AsyncPublishBytesCommand.Create(_pool, GetCommandTimer(cancellationToken), subject, replyTo, headers, payload); - if (TryEnqueueCommand(command)) - { - return command.AsValueTask(); - } - else - { - return EnqueueAndAwaitCommandAsync(command); - } - } - else - { - return WithConnectAsync(subject, replyTo, headers, payload, cancellationToken, static (self, s, r, h, p, token) => - { - var command = AsyncPublishBytesCommand.Create(self._pool, self.GetCommandTimer(token), s, r, h, p); - return self.EnqueueAndAwaitCommandAsync(command); - }); - } + return PubAsync(subject, opts?.ReplyTo, payload, opts?.Headers, cancellationToken); } /// @@ -44,32 +19,8 @@ public ValueTask PublishAsync(NatsMsg msg, CancellationToken cancellationToken = /// public ValueTask PublishAsync(string subject, T data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { - var replyTo = opts?.ReplyTo; var serializer = opts?.Serializer ?? Options.Serializer; - var headers = opts?.Headers; - - headers?.SetReadOnly(); - - if (ConnectionState == NatsConnectionState.Open) - { - var command = AsyncPublishCommand.Create(_pool, GetCommandTimer(cancellationToken), subject, replyTo, headers, data, serializer); - if (TryEnqueueCommand(command)) - { - return command.AsValueTask(); - } - else - { - return EnqueueAndAwaitCommandAsync(command); - } - } - else - { - return WithConnectAsync(subject, replyTo, headers, data, serializer, cancellationToken, static (self, s, r, h, v, ser, token) => - { - var command = AsyncPublishCommand.Create(self._pool, self.GetCommandTimer(token), s, r, h, v, ser); - return self.EnqueueAndAwaitCommandAsync(command); - }); - } + return PubModelAsync(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken); } /// diff --git a/src/NATS.Client.Core/NatsConnection.Subscribe.cs b/src/NATS.Client.Core/NatsConnection.Subscribe.cs index bbacf7f03..5da367acf 100644 --- a/src/NATS.Client.Core/NatsConnection.Subscribe.cs +++ b/src/NATS.Client.Core/NatsConnection.Subscribe.cs @@ -1,3 +1,5 @@ +using System.Collections.Concurrent; + namespace NATS.Client.Core; public partial class NatsConnection @@ -5,36 +7,13 @@ public partial class NatsConnection /// public ValueTask SubscribeAsync(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { - var queueGroup = opts?.QueueGroup; - if (ConnectionState == NatsConnectionState.Open) - { - return _subscriptionManager.AddAsync(subject, queueGroup, cancellationToken); - } - else - { - return WithConnectAsync(subject, queueGroup, cancellationToken, static (self, key, qg, token) => - { - return self._subscriptionManager.AddAsync(key, qg, token); - }); - } + return SubAsync(subject, opts?.QueueGroup, NatsSubBuilder.Default, cancellationToken); } /// public ValueTask> SubscribeAsync(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { - var queueGroup = opts?.QueueGroup; var serializer = opts?.Serializer ?? Options.Serializer; - - if (ConnectionState == NatsConnectionState.Open) - { - return _subscriptionManager.AddAsync(subject, queueGroup, serializer, cancellationToken); - } - else - { - return WithConnectAsync(subject, queueGroup, serializer, cancellationToken, static (self, s, qg, ser, token) => - { - return self._subscriptionManager.AddAsync(s, qg, ser, token); - }); - } + return SubAsync>(subject, opts?.QueueGroup, NatsSubModelBuilder.For(serializer), cancellationToken); } } diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index af53645bf..f1e2924a5 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -16,7 +16,7 @@ public enum NatsConnectionState Reconnecting, } -public partial class NatsConnection : IAsyncDisposable, INatsCommand +public partial class NatsConnection : IAsyncDisposable, INatsConnection { #pragma warning disable SA1401 /// @@ -90,7 +90,7 @@ public NatsConnection(NatsOptions options) public ServerInfo? ServerInfo { get; internal set; } // server info is set when received INFO - internal HeaderParser HeaderParser { get; } + public HeaderParser HeaderParser { get; } /// /// Connect socket and write CONNECT command to nats server. diff --git a/src/NATS.Client.Core/NatsConnectionPool.cs b/src/NATS.Client.Core/NatsConnectionPool.cs index ff0803096..ac824c423 100644 --- a/src/NATS.Client.Core/NatsConnectionPool.cs +++ b/src/NATS.Client.Core/NatsConnectionPool.cs @@ -52,7 +52,7 @@ public IEnumerable GetConnections() } } - public INatsCommand GetCommand() + public INatsConnection GetCommand() { return GetConnection(); } diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index f3b84413e..e7bba8442 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -3,18 +3,13 @@ namespace NATS.Client.Core; -public record NatsMsg(string Subject, ReadOnlyMemory Data) : NatsMsgBase(Subject); - -public record NatsMsg(string Subject, T Data) : NatsMsgBase(Subject); - -public abstract record NatsMsgBase(string Subject) +public readonly record struct NatsMsg( + string Subject, + string? ReplyTo, + NatsHeaders? Headers, + ReadOnlyMemory Data, + INatsConnection? Connection) { - internal INatsCommand? Connection { get; init; } - - public string? ReplyTo { get; init; } - - public NatsHeaders? Headers { get; init; } - public ValueTask ReplyAsync(ReadOnlySequence data = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); @@ -27,6 +22,28 @@ public ValueTask ReplyAsync(NatsMsg msg, CancellationToken cancellationToken = d return Connection.PublishAsync(msg with { Subject = ReplyTo! }, cancellationToken); } + [MemberNotNull(nameof(Connection))] + private void CheckReplyPreconditions() + { + if (Connection == default) + { + throw new NatsException("unable to send reply; message did not originate from a subscription"); + } + + if (string.IsNullOrWhiteSpace(ReplyTo)) + { + throw new NatsException("unable to send reply; ReplyTo is empty"); + } + } +} + +public readonly record struct NatsMsg( + string Subject, + string? ReplyTo, + NatsHeaders? Headers, + T? Data, + INatsConnection? Connection) +{ public ValueTask ReplyAsync(TReply data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); diff --git a/src/NATS.Client.Core/NatsObservable.cs b/src/NATS.Client.Core/NatsObservable.cs deleted file mode 100644 index cc3bff612..000000000 --- a/src/NATS.Client.Core/NatsObservable.cs +++ /dev/null @@ -1,129 +0,0 @@ -namespace NATS.Client.Core; - -internal sealed class NatsObservable : IObservable -{ - private readonly NatsConnection _connection; - private readonly string _subject; - - public NatsObservable(NatsConnection connection, string subject) - { - _subject = subject; - _connection = connection; - } - - public IDisposable Subscribe(IObserver observer) - { - var disp = new CancellationTokenDisposable(); - var disp2 = new FireAndForgetDisposable(_connection.SubscribeAsync(_subject, cancellationToken: disp.Token), observer); - return new Tuple2Disposable(disp, disp2); - } - - private sealed class FireAndForgetDisposable : IDisposable - { - private readonly IObserver _observer; - private bool _disposed; - private IAsyncDisposable? _taskDisposable; - private object _gate = new object(); - - public FireAndForgetDisposable(ValueTask> natsSub, IObserver observer) - { - _observer = observer; - _disposed = false; - FireAndForget(natsSub, observer); - } - - public void Dispose() - { - lock (_gate) - { - _disposed = true; - if (_taskDisposable != null) - { - _ = _taskDisposable.DisposeAsync(); - } - } - } - - private async void FireAndForget(ValueTask> natsSub, IObserver observer) - { - try - { - var sub = await natsSub.ConfigureAwait(false); - - // TODO: Consider removing observable support from the API - // * Channels and observables don't go together very well and creating a generic solution is - // problematic. An avid RX developer should be able to hook the channel up easily for their - // scenario. - // * Below workaround isn't very well thought out and will probably create bugs except maybe - // in simple scenarios. - _ = Task.Run(async () => - { - await foreach (var msg in sub.Msgs.ReadAllAsync()) - { - observer.OnNext(msg.Data); - } - }); - - _taskDisposable = sub; - - lock (_gate) - { - if (_disposed) - { - _ = _taskDisposable.DisposeAsync(); - } - } - } - catch (Exception ex) - { - lock (_gate) - { - _disposed = true; - } - - _observer.OnError(ex); - } - } - } - - private class CancellationTokenDisposable : IDisposable - { - private readonly CancellationTokenSource _cancellationTokenSource; - private bool _disposed; - - public CancellationTokenDisposable() - { - _cancellationTokenSource = new CancellationTokenSource(); - } - - public CancellationToken Token => _cancellationTokenSource.Token; - - public void Dispose() - { - if (!_disposed) - { - _disposed = true; - _cancellationTokenSource.Cancel(); - _cancellationTokenSource.Dispose(); - } - } - } - - private class Tuple2Disposable : IDisposable - { - private readonly IDisposable _disposable1; - private readonly IDisposable _disposable2; - - public Tuple2Disposable(IDisposable disposable1, IDisposable disposable2) - { - _disposable1 = disposable1; - _disposable2 = disposable2; - } - - public void Dispose() - { - _disposable1.Dispose(); - _disposable2.Dispose(); - } - } -} diff --git a/src/NATS.Client.Core/NatsOptions.cs b/src/NATS.Client.Core/NatsOptions.cs index 047c05b42..2b1d8d195 100644 --- a/src/NATS.Client.Core/NatsOptions.cs +++ b/src/NATS.Client.Core/NatsOptions.cs @@ -1,6 +1,7 @@ using System.Text; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using NATS.Client.Core.Internal; namespace NATS.Client.Core; diff --git a/src/NATS.Client.Core/NatsReplyHandle.cs b/src/NATS.Client.Core/NatsReplyHandle.cs index 9725d368f..edab786d5 100644 --- a/src/NATS.Client.Core/NatsReplyHandle.cs +++ b/src/NATS.Client.Core/NatsReplyHandle.cs @@ -4,7 +4,7 @@ namespace NATS.Client.Core; public static class NatReplyUtils { - public static async Task ReplyAsync(this INatsCommand nats, string subject, Func reply) + public static async Task ReplyAsync(this INatsConnection nats, string subject, Func reply) { var sub = await nats.SubscribeAsync(subject).ConfigureAwait(false); var reader = Task.Run(async () => @@ -25,7 +25,7 @@ public static async Task ReplyAsync(this I return new NatsReplyHandle(sub, reader); } - public static async Task RequestAsync(this INatsCommand nats, string subject, TRequest request) + public static async Task RequestAsync(this INatsConnection nats, string subject, TRequest request) { var bs = ((NatsConnection)nats).InboxPrefix.ToArray(); // TODO: Fix inbox prefix var replyTo = $"{Encoding.ASCII.GetString(bs)}{Guid.NewGuid():N}"; diff --git a/src/NATS.Client.Core/NatsShardingConnection.cs b/src/NATS.Client.Core/NatsShardingConnection.cs deleted file mode 100644 index a66e7c60d..000000000 --- a/src/NATS.Client.Core/NatsShardingConnection.cs +++ /dev/null @@ -1,111 +0,0 @@ -using System.Buffers.Binary; -using System.IO.Hashing; -using System.Runtime.CompilerServices; - -namespace NATS.Client.Core; - -public readonly struct ShardringNatsCommand -{ - private readonly NatsConnection _connection; - private readonly string _subject; - - public ShardringNatsCommand(NatsConnection connection, string subject) - { - _connection = connection; - _subject = subject; - } - - public NatsConnection GetConnection() => _connection; - - public IObservable AsObservable() => _connection.AsObservable(_subject); - - public ValueTask FlushAsync() => _connection.FlushAsync(); - - public ValueTask PingAsync() => _connection.PingAsync(); - - public ValueTask PublishAsync() => _connection.PublishAsync(_subject); - - public ValueTask PublishAsync(byte[] value) => _connection.PublishAsync(_subject, value); - - public ValueTask PublishAsync(ReadOnlyMemory value) => _connection.PublishAsync(_subject, value); - - public ValueTask PublishAsync(T value) => _connection.PublishAsync(_subject, value); - - public Task RequestAsync(TRequest request) => _connection.RequestAsync(_subject, request); - - public Task ReplyAsync(Func reply) => _connection.ReplyAsync(_subject, reply); - - public ValueTask> SubscribeAsync() => _connection.SubscribeAsync(_subject); - - public ValueTask SubscribeAsync(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default) => - _connection.SubscribeAsync(subject, opts, cancellationToken); -} - -public sealed class NatsShardingConnection : IAsyncDisposable -{ - private readonly NatsConnectionPool[] _pools; - - public NatsShardingConnection(int poolSize, NatsOptions options, string[] urls) - : this(poolSize, options, urls, _ => { }) - { - } - - public NatsShardingConnection(int poolSize, NatsOptions options, string[] urls, Action configureConnection) - { - poolSize = Math.Max(1, poolSize); - _pools = new NatsConnectionPool[urls.Length]; - for (var i = 0; i < urls.Length; i++) - { - _pools[i] = new NatsConnectionPool(poolSize, options with { Url = urls[i] }, configureConnection); - } - } - - public IEnumerable GetConnections() - { - foreach (var item in _pools) - { - foreach (var conn in item.GetConnections()) - { - yield return conn; - } - } - } - - public ShardringNatsCommand GetCommand(string subject) - { - Validate(subject); - var i = GetHashIndex(subject); - var pool = _pools[i]; - return new ShardringNatsCommand(pool.GetConnection(), subject); - } - - public async ValueTask DisposeAsync() - { - foreach (var item in _pools) - { - await item.DisposeAsync().ConfigureAwait(false); - } - } - - // TODO: Unsafe really needed? - // Benchmark without SkipLocalsInit to see if the performance impact is negligible - [SkipLocalsInit] - private int GetHashIndex(string key) - { - var source = System.Runtime.InteropServices.MemoryMarshal.AsBytes(key.AsSpan()); - Span destination = stackalloc byte[4]; - XxHash32.TryHash(source, destination, out var _); - - var hash = BinaryPrimitives.ReadUInt32BigEndian(destination); // xxhash spec is big-endian - var v = hash % (uint)_pools.Length; - return (int)v; - } - - private void Validate(string key) - { - if (key.AsSpan().IndexOfAny('*', '>') != -1) - { - throw new ArgumentException($"Wild card is not supported in sharding connection. Key:{key}"); - } - } -} diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSub.cs index 77cd2d1b3..5cbf58a15 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSub.cs @@ -1,11 +1,9 @@ using System.Buffers; -using System.Text; using System.Threading.Channels; -using NATS.Client.Core.Internal; namespace NATS.Client.Core; -public abstract class NatsSubBase : IAsyncDisposable +public abstract class NatsSubBase : INatsSub { internal NatsSubBase(NatsConnection connection, SubscriptionManager manager, string subject, string? queueGroup, int sid) { @@ -20,7 +18,7 @@ internal NatsSubBase(NatsConnection connection, SubscriptionManager manager, str public string? QueueGroup { get; } - internal int Sid { get; } + public int Sid { get; } internal NatsConnection Connection { get; } @@ -31,7 +29,7 @@ public virtual ValueTask DisposeAsync() return Manager.RemoveAsync(Sid); } - internal abstract ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer); + public abstract ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer); } public sealed class NatsSub : NatsSubBase @@ -57,7 +55,7 @@ public override ValueTask DisposeAsync() return base.DisposeAsync(); } - internal override ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) + public override ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) { NatsHeaders? natsHeaders = null; if (headersBuffer != null) @@ -71,12 +69,7 @@ internal override ValueTask ReceiveAsync(string subject, string? replyTo, in Rea natsHeaders.SetReadOnly(); } - return _msgs.Writer.WriteAsync(new NatsMsg(subject, payloadBuffer.ToArray()) - { - Connection = Connection, - ReplyTo = replyTo, - Headers = natsHeaders, - }); + return _msgs.Writer.WriteAsync(new NatsMsg(subject, replyTo, natsHeaders, payloadBuffer.ToArray(), Connection)); } } @@ -103,7 +96,7 @@ public override ValueTask DisposeAsync() return base.DisposeAsync(); } - internal override ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) + public override ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) { var serializer = Serializer; var data = serializer.Deserialize(payloadBuffer); @@ -120,11 +113,6 @@ internal override ValueTask ReceiveAsync(string subject, string? replyTo, in Rea natsHeaders.SetReadOnly(); } - return _msgs.Writer.WriteAsync(new NatsMsg(subject, data!) - { - Connection = Connection, - ReplyTo = replyTo, - Headers = natsHeaders, - }); + return _msgs.Writer.WriteAsync(new NatsMsg(subject, replyTo, natsHeaders, data, Connection)); } } diff --git a/src/NATS.Client.Core/SubscriptionManager.cs b/src/NATS.Client.Core/SubscriptionManager.cs index 50d5b8b97..b1ef6bef9 100644 --- a/src/NATS.Client.Core/SubscriptionManager.cs +++ b/src/NATS.Client.Core/SubscriptionManager.cs @@ -1,6 +1,5 @@ using System.Buffers; using System.Collections.Concurrent; -using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; namespace NATS.Client.Core; @@ -10,7 +9,7 @@ internal sealed class SubscriptionManager : IAsyncDisposable private readonly ILogger _logger; private readonly object _gate = new(); private readonly NatsConnection _connection; - private readonly ConcurrentDictionary> _bySid = new(); + private readonly ConcurrentDictionary> _bySid = new(); private readonly CancellationTokenSource _cts; private readonly Task _timer; private readonly TimeSpan _cleanupInterval; @@ -40,20 +39,26 @@ public SubscriptionManager(NatsConnection connection) } } - public async ValueTask AddAsync(string subject, string? queueGroup, CancellationToken cancellationToken) - { - var sid = Interlocked.Increment(ref _sid); - var sub = new NatsSub(_connection, this, subject, queueGroup, sid); - await SubAsync(subject, queueGroup, cancellationToken, sid, sub).ConfigureAwait(false); - return sub; - } + public int GetNextSid() => Interlocked.Increment(ref _sid); - public async ValueTask> AddAsync(string subject, string? queueGroup, INatsSerializer serializer, CancellationToken cancellationToken) + public async ValueTask SubscribeAsync(string subject, string? queueGroup, T sub, CancellationToken cancellationToken) + where T : INatsSub { - var sid = Interlocked.Increment(ref _sid); - var sub = new NatsSub(_connection, this, subject, queueGroup, sid, serializer); - await SubAsync(subject, queueGroup, cancellationToken, sid, sub).ConfigureAwait(false); - return sub; + lock (_gate) + { + _bySid[sub.Sid] = new WeakReference(sub); + } + + try + { + await _connection.SubscribeCoreAsync(sub.Sid, subject, queueGroup, cancellationToken).ConfigureAwait(false); + return sub; + } + catch + { + await sub.DisposeAsync().ConfigureAwait(false); + throw; + } } public ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) @@ -98,7 +103,7 @@ public async ValueTask DisposeAsync() { _cts.Cancel(); - WeakReference[] subRefs; + WeakReference[] subRefs; lock (_gate) { subRefs = _bySid.Values.ToArray(); @@ -112,7 +117,7 @@ public async ValueTask DisposeAsync() } } - internal ValueTask RemoveAsync(int sid) + public ValueTask RemoveAsync(int sid) { lock (_gate) _bySid.Remove(sid, out _); @@ -163,23 +168,4 @@ private async ValueTask UnsubscribeSidsAsync(List sids) } } } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private async Task SubAsync(string subject, string? queueGroup, CancellationToken cancellationToken, int sid, NatsSubBase sub) - { - lock (_gate) - { - _bySid[sid] = new WeakReference(sub); - } - - try - { - await _connection.SubscribeCoreAsync(sid, subject, queueGroup, cancellationToken).ConfigureAwait(false); - } - catch - { - await sub.DisposeAsync().ConfigureAwait(false); - throw; - } - } } diff --git a/src/NATS.Client.Hosting/NatsHostingExtensions.cs b/src/NATS.Client.Hosting/NatsHostingExtensions.cs index 196671b38..1509c3ff6 100644 --- a/src/NATS.Client.Hosting/NatsHostingExtensions.cs +++ b/src/NATS.Client.Hosting/NatsHostingExtensions.cs @@ -34,7 +34,7 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p return pool.GetConnection(); }); - services.TryAddTransient(static provider => + services.TryAddTransient(static provider => { var pool = provider.GetRequiredService(); return pool.GetCommand(); @@ -59,7 +59,7 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p return conn; }); - services.TryAddSingleton(static provider => + services.TryAddSingleton(static provider => { return provider.GetRequiredService(); }); @@ -67,23 +67,4 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p return services; } - - /// - /// Add Singleton NatsShardingConnection to ServiceCollection. - /// - public static IServiceCollection AddNats(this IServiceCollection services, int poolSize, string[] urls, Func? configureOptions = null, Action? configureConnection = null) - { - services.TryAddSingleton(provider => - { - var options = NatsOptions.Default with { LoggerFactory = provider.GetRequiredService() }; - if (configureOptions != null) - { - options = configureOptions(options); - } - - return new NatsShardingConnection(poolSize, options, urls, configureConnection ?? (_ => { })); - }); - - return services; - } } diff --git a/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs b/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs new file mode 100644 index 000000000..042dc94a2 --- /dev/null +++ b/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs @@ -0,0 +1,130 @@ +using System.Buffers; +using System.Text; + +namespace NATS.Client.Core.Tests; + +public class LowLevelApiTest +{ + private readonly ITestOutputHelper _output; + + public LowLevelApiTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Sub_custom_builder_test() + { + await using var server = new NatsServer(); + var nats = server.CreateClientConnection(); + + var builder = new NatsSubCustomTestBuilder(_output); + NatsSubTest sub = await nats.SubAsync("foo.*", null, builder); + + await Retry.Until( + "subscription is ready", + () => builder.IsSynced, + async () => await nats.PubAsync("foo.sync")); + + for (int i = 0; i < 10; i++) + { + var headers = new NatsHeaders { { "X-Test", $"value-{i}" } }; + await nats.PubModelAsync($"foo.data{i}", i, JsonNatsSerializer.Default, "bar", headers); + } + + await nats.PubAsync("foo.done"); + await builder.Done; + + Assert.Equal(10, builder.Messages.Count()); + + await sub.DisposeAsync(); + } + + private class NatsSubTest : INatsSub + { + private readonly NatsSubCustomTestBuilder _builder; + private readonly ITestOutputHelper _output; + private readonly SubscriptionManager _manager; + + public NatsSubTest(NatsSubCustomTestBuilder builder, ITestOutputHelper output, SubscriptionManager manager) + { + _builder = builder; + _output = output; + _manager = manager; + } + + public string Subject => string.Empty; + + public string QueueGroup => string.Empty; + + public int Sid => 0; + + public ValueTask DisposeAsync() => _manager.DisposeAsync(); + + public ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) + { + if (subject.EndsWith(".sync")) + { + _builder.Sync(); + } + else if (subject.EndsWith(".done")) + { + _builder.MarkAsDone(); + } + else + { + byte[]? headers = headersBuffer?.ToArray(); + byte[] payload = payloadBuffer.ToArray(); + + var sb = new StringBuilder(); + sb.AppendLine($"Subject: {subject}"); + sb.AppendLine($"Reply-To: {replyTo}"); + sb.Append($"Headers: "); + if (headers != null) + sb.Append(Encoding.ASCII.GetString(headers).Replace("\r\n", " ")); + sb.AppendLine(); + sb.AppendLine($"Payload: {Encoding.ASCII.GetString(payload)}"); + + _output.WriteLine(sb.ToString()); + + _builder.MessageReceived(sb.ToString()); + } + + return ValueTask.CompletedTask; + } + } + + private class NatsSubCustomTestBuilder : INatsSubBuilder + { + private readonly ITestOutputHelper _output; + private readonly WaitSignal _done = new(); + private readonly List _messages = new(); + private int _sync; + + public NatsSubCustomTestBuilder(ITestOutputHelper output) => _output = output; + + public bool IsSynced => Volatile.Read(ref _sync) == 1; + + public WaitSignal Done => _done; + + public IEnumerable Messages + { + get + { + lock (_messages) + return _messages.ToArray(); + } + } + + public NatsSubTest Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager) + { + return new NatsSubTest(this, _output, manager); + } + + public void Sync() => Interlocked.Exchange(ref _sync, 1); + + public void MarkAsDone() => _done.Pulse(); + + public void MessageReceived(string message) + { + lock (_messages) _messages.Add(message); + } + } +} diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Sharding.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Sharding.cs deleted file mode 100644 index 230036f19..000000000 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Sharding.cs +++ /dev/null @@ -1,68 +0,0 @@ -namespace NATS.Client.Core.Tests; - -public abstract partial class NatsConnectionTest -{ - // TODO:do. - [Fact] - public async Task ConnectionPoolTest() - { - await using var server = new NatsServer(_output, _transportType); - - var conn = server.CreatePooledClientConnection(); - - var a = conn.GetConnection(); - var b = conn.GetConnection(); - var c = conn.GetConnection(); - var d = conn.GetConnection(); - var e = conn.GetConnection(); - - a.Should().Be(e); - conn.GetConnections().ToArray().Length.ShouldBe(4); - new[] { a, b, c, d, e }.Distinct().Count().Should().Be(4); - } - - [Fact] - public async Task ShardingConnectionTest() - { - await using var server1 = new NatsServer(_output, _transportType); - await using var server2 = new NatsServer(_output, _transportType); - await using var server3 = new NatsServer(_output, _transportType); - - var urls = new[] { server1, server2, server3 } - .Select(s => s.ClientUrl).ToArray(); - var shardedConnection = new NatsShardingConnection(1, server1.ClientOptions(NatsOptions.Default), urls); - - var l1 = new List(); - var l2 = new List(); - var l3 = new List(); - var sub1 = await shardedConnection.GetCommand("foo").SubscribeAsync(); - var reg1 = sub1.Register(msg => l1.Add(msg.Data)); - var sub2 = await shardedConnection.GetCommand("bar").SubscribeAsync(); - var reg2 = sub2.Register(msg => l2.Add(msg.Data)); - var sub3 = await shardedConnection.GetCommand("baz").SubscribeAsync(); - var reg3 = sub3.Register(msg => l3.Add(msg.Data)); - - await shardedConnection.GetCommand("foo").PublishAsync(10); - await shardedConnection.GetCommand("bar").PublishAsync(20); - await shardedConnection.GetCommand("baz").PublishAsync(30); - - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - l1.ShouldEqual(10); - l2.ShouldEqual(20); - l3.ShouldEqual(30); - - await shardedConnection.GetCommand("foobarbaz").ReplyAsync((int x) => x * x); - - var r = await shardedConnection.GetCommand("foobarbaz").RequestAsync(100); - - r.ShouldBe(10000); - - await sub1.DisposeAsync(); - await reg1; - await sub2.DisposeAsync(); - await reg2; - await sub3.DisposeAsync(); - await reg3; - } -}