diff --git a/src/NATS.Client.Core/INatsSerializer.cs b/src/NATS.Client.Core/INatsSerializer.cs index d1c9c7743..51254c611 100644 --- a/src/NATS.Client.Core/INatsSerializer.cs +++ b/src/NATS.Client.Core/INatsSerializer.cs @@ -9,6 +9,8 @@ public interface INatsSerializer int Serialize(ICountableBufferWriter bufferWriter, T? value); T? Deserialize(in ReadOnlySequence buffer); + + object? Deserialize(in ReadOnlySequence buffer, Type type); } public interface ICountableBufferWriter : IBufferWriter @@ -63,6 +65,12 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) return JsonSerializer.Deserialize(ref reader, _options); } + public object? Deserialize(in ReadOnlySequence buffer, Type type) + { + var reader = new Utf8JsonReader(buffer); // Utf8JsonReader is ref struct, no allocate. + return JsonSerializer.Deserialize(ref reader, type, _options); + } + private sealed class NullBufferWriter : IBufferWriter { internal static readonly IBufferWriter Instance = new NullBufferWriter(); diff --git a/src/NATS.Client.Core/InboxSubscriber.cs b/src/NATS.Client.Core/InboxSubscriber.cs new file mode 100644 index 000000000..6e0127dc5 --- /dev/null +++ b/src/NATS.Client.Core/InboxSubscriber.cs @@ -0,0 +1,203 @@ +using System.Buffers; +using System.Collections.Concurrent; +using System.Runtime.ExceptionServices; +using System.Threading.Tasks.Sources; +using Microsoft.Extensions.Logging; +using NATS.Client.Core.Internal; + +namespace NATS.Client.Core; + +internal class InboxSubscriber : INatsSubBuilder, IAsyncDisposable +{ + private readonly ILogger _logger; + private readonly string? _queueGroup; + private readonly ConcurrentDictionary _writers = new(); + private readonly string _prefix; + private InboxSub? _sub; + private bool _started; + + public InboxSubscriber( + NatsConnection connection, + string? queueGroup = default, + string prefix1 = "_INBOX", + string? prefix2 = default) + { + _logger = connection.Options.LoggerFactory.CreateLogger(); + _queueGroup = queueGroup; + prefix2 ??= Guid.NewGuid().ToString("N"); + _prefix = $"{prefix1}.{prefix2}"; + Connection = connection; + } + + private NatsConnection Connection { get; } + + public async ValueTask EnsureStartedAsync() + { + // Only one call to this method can start the subscription. + lock (this) + { + if (_started) + { + return; + } + + _started = true; + } + + try + { + _sub = await Connection.SubAsync($"{_prefix}.*", _queueGroup, this) + .ConfigureAwait(false); + } + catch + { + // There is a race here when there are two or more calls to this method and + // the first one fails to subscribe, other calls might carry on without exception. + // While first call would produce the correct exception, subsequent calls will only + // fail with a timeout. We reset here to allow retries to subscribe again. + lock (this) _started = false; + throw; + } + } + + public InboxSub Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager) + { + var sid = manager.GetNextSid(); + return new InboxSub(this, subject, queueGroup, sid, connection, manager); + } + + public string Register(MsgWrapper msg, string? suffix = null) + { + suffix ??= Guid.NewGuid().ToString("N"); + var subject = $"{_prefix}.{suffix}"; + if (!_writers.TryAdd(subject, msg)) + throw new InvalidOperationException("Subject already registered"); + return subject; + } + + public void Unregister(string subject) => _writers.TryRemove(subject, out _); + + public void Received(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer, NatsConnection connection) + { + if (!_writers.TryGetValue(subject, out var msgWrapper)) + { + _logger.LogWarning("Unregistered message inbox received"); + return; + } + + msgWrapper.MsgReceived(subject, replyTo, headersBuffer, payloadBuffer, connection); + } + + public ValueTask DisposeAsync() + { + if (_sub != null) + return _sub.DisposeAsync(); + return ValueTask.CompletedTask; + } +} + +internal class InboxSub : INatsSub +{ + private readonly InboxSubscriber _inbox; + private readonly NatsConnection _connection; + private readonly SubscriptionManager _manager; + + public InboxSub( + InboxSubscriber inbox, + string subject, + string? queueGroup, + int sid, + NatsConnection connection, + SubscriptionManager manager) + { + _inbox = inbox; + _connection = connection; + _manager = manager; + Subject = subject; + QueueGroup = queueGroup; + Sid = sid; + } + + public string Subject { get; } + + public string? QueueGroup { get; } + + public int Sid { get; } + + public ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) + { + _inbox.Received(subject, replyTo, headersBuffer, payloadBuffer, _connection); + return ValueTask.CompletedTask; + } + + public ValueTask DisposeAsync() => _manager.RemoveAsync(Sid); +} + +internal class MsgWrapper : IValueTaskSource, IObjectPoolNode +{ + private ManualResetValueTaskSourceCore _core = new() + { + RunContinuationsAsynchronously = true, + }; + + private INatsSerializer? _serializer; + private Type? _type; + private MsgWrapper? _next; + private CancellationTokenRegistration _ctr; + + public ref MsgWrapper? NextNode => ref _next; + + public void SetSerializer(INatsSerializer serializer, CancellationToken cancellationToken) + { + _serializer = serializer; + _type = typeof(TData); + _ctr = cancellationToken.UnsafeRegister( + static (msgWrapper, cancellationToken) => ((MsgWrapper)msgWrapper!).Cancel(cancellationToken), this); + } + + public void MsgReceived(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer, NatsConnection connection) + { + if (_serializer == null || _type == null) + throw new NullReferenceException("Serializer must be set"); + var data = _serializer.Deserialize(payloadBuffer, _type); + _core.SetResult(data); + } + + public ValueTask MsgRetrieveAsync() => new(this, _core.Version); + + public object? GetResult(short token) + { + _ctr.Dispose(); + lock (this) + { + try + { + return _core.GetResult(token); + } + finally + { + _core.Reset(); + _serializer = default; + _type = default; + _ctr = default; + } + } + } + + public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token); + + public void OnCompleted( + Action continuation, + object? state, + short token, + ValueTaskSourceOnCompletedFlags flags) + => _core.OnCompleted(continuation, state, token, flags); + + private void Cancel(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + _core.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new OperationCanceledException(cancellationToken))); + } + } +} diff --git a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs index 524dd6d57..343a4d71f 100644 --- a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs @@ -182,7 +182,7 @@ private async Task ReadLoopAsync() } var msgHeader = buffer.Slice(0, positionBeforePayload.Value); - var (subject, sid, payloadLength, replyTo, reponseId) = ParseMessageHeader(msgHeader); + var (subject, sid, payloadLength, replyTo) = ParseMessageHeader(msgHeader); if (payloadLength == 0) { @@ -432,7 +432,7 @@ private async ValueTask> DispatchCommandAsync(int code, R // https://docs.nats.io/reference/reference-protocols/nats-protocol#msg // MSG [reply-to] <#bytes>\r\n[payload] - private (string subject, int sid, int payloadLength, string? replyTo, int? responseId) ParseMessageHeader(ReadOnlySpan msgHeader) + private (string subject, int sid, int payloadLength, string? replyTo) ParseMessageHeader(ReadOnlySpan msgHeader) { msgHeader = msgHeader.Slice(4); Split(msgHeader, out var subjectBytes, out msgHeader); @@ -443,24 +443,9 @@ private async ValueTask> DispatchCommandAsync(int code, R if (msgHeader.Length == 0) { - int? responseId = null; - - // Parse: _INBOX.RANDOM-GUID.ID - if (subjectBytes.StartsWith(_connection.InboxPrefix.Span)) - { - var lastIndex = subjectBytes.LastIndexOf((byte)'.'); - if (lastIndex != -1) - { - if (Utf8Parser.TryParse(subjectBytes.Slice(lastIndex + 1), out int id, out _)) - { - responseId = id; - } - } - } - var sid = GetInt32(sidBytes); var size = GetInt32(replyToOrSizeBytes); - return (subject, sid, size, null, responseId); + return (subject, sid, size, null); } else { @@ -470,11 +455,11 @@ private async ValueTask> DispatchCommandAsync(int code, R var sid = GetInt32(sidBytes); var payloadLength = GetInt32(bytesSlice); var replyTo = Encoding.ASCII.GetString(replyToBytes); - return (subject, sid, payloadLength, replyTo, null); + return (subject, sid, payloadLength, replyTo); } } - private (string subject, int sid, int payloadLength, string? replyTo, int? responseId) ParseMessageHeader(in ReadOnlySequence msgHeader) + private (string subject, int sid, int payloadLength, string? replyTo) ParseMessageHeader(in ReadOnlySequence msgHeader) { if (msgHeader.IsSingleSegment) { diff --git a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs index 2192085f0..ef554a07f 100644 --- a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs +++ b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs @@ -11,7 +11,7 @@ internal ValueTask PubAsync(string subject, string? replyTo = default, ReadOnlyS if (ConnectionState == NatsConnectionState.Open) { - var command = AsyncPublishBytesCommand.Create(_pool, GetCommandTimer(cancellationToken), subject, replyTo, headers, payload); + var command = AsyncPublishBytesCommand.Create(_pool, GetCancellationTimer(cancellationToken), subject, replyTo, headers, payload); if (TryEnqueueCommand(command)) { return command.AsValueTask(); @@ -25,19 +25,19 @@ internal ValueTask PubAsync(string subject, string? replyTo = default, ReadOnlyS { return WithConnectAsync(subject, replyTo, headers, payload, cancellationToken, static (self, s, r, h, p, token) => { - var command = AsyncPublishBytesCommand.Create(self._pool, self.GetCommandTimer(token), s, r, h, p); + var command = AsyncPublishBytesCommand.Create(self._pool, self.GetCancellationTimer(token), s, r, h, p); return self.EnqueueAndAwaitCommandAsync(command); }); } } - internal ValueTask PubModelAsync(string subject, T data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) + internal ValueTask PubModelAsync(string subject, T? data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) { headers?.SetReadOnly(); if (ConnectionState == NatsConnectionState.Open) { - var command = AsyncPublishCommand.Create(_pool, GetCommandTimer(cancellationToken), subject, replyTo, headers, data, serializer); + var command = AsyncPublishCommand.Create(_pool, GetCancellationTimer(cancellationToken), subject, replyTo, headers, data, serializer); if (TryEnqueueCommand(command)) { return command.AsValueTask(); @@ -51,7 +51,7 @@ internal ValueTask PubModelAsync(string subject, T data, INatsSerializer seri { return WithConnectAsync(subject, replyTo, headers, data, serializer, cancellationToken, static (self, s, r, h, v, ser, token) => { - var command = AsyncPublishCommand.Create(self._pool, self.GetCommandTimer(token), s, r, h, v, ser); + var command = AsyncPublishCommand.Create(self._pool, self.GetCancellationTimer(token), s, r, h, v, ser); return self.EnqueueAndAwaitCommandAsync(command); }); } diff --git a/src/NATS.Client.Core/NatsConnection.Ping.cs b/src/NATS.Client.Core/NatsConnection.Ping.cs index bf0db5c35..20946121f 100644 --- a/src/NATS.Client.Core/NatsConnection.Ping.cs +++ b/src/NATS.Client.Core/NatsConnection.Ping.cs @@ -9,7 +9,7 @@ public ValueTask PingAsync(CancellationToken cancellationToken = defau { if (ConnectionState == NatsConnectionState.Open) { - var command = AsyncPingCommand.Create(this, _pool, GetCommandTimer(cancellationToken)); + var command = AsyncPingCommand.Create(this, _pool, GetCancellationTimer(cancellationToken)); if (TryEnqueueCommand(command)) { return command.AsValueTask(); @@ -23,7 +23,7 @@ public ValueTask PingAsync(CancellationToken cancellationToken = defau { return WithConnectAsync(cancellationToken, static (self, token) => { - var command = AsyncPingCommand.Create(self, self._pool, self.GetCommandTimer(token)); + var command = AsyncPingCommand.Create(self, self._pool, self.GetCancellationTimer(token)); return self.EnqueueAndAwaitCommandAsync(command); }); } @@ -33,11 +33,11 @@ private void PostPing(CancellationToken cancellationToken = default) { if (ConnectionState == NatsConnectionState.Open) { - EnqueueCommandSync(PingCommand.Create(_pool, GetCommandTimer(cancellationToken))); + EnqueueCommandSync(PingCommand.Create(_pool, GetCancellationTimer(cancellationToken))); } else { - WithConnect(cancellationToken, static (self, token) => self.EnqueueCommandSync(PingCommand.Create(self._pool, self.GetCommandTimer(token)))); + WithConnect(cancellationToken, static (self, token) => self.EnqueueCommandSync(PingCommand.Create(self._pool, self.GetCancellationTimer(token)))); } } } diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index a41ebbc6c..214fcfe75 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -17,7 +17,7 @@ public ValueTask PublishAsync(NatsMsg msg, CancellationToken cancellationToken = } /// - public ValueTask PublishAsync(string subject, T data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(string subject, T? data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { var serializer = opts?.Serializer ?? Options.Serializer; return PubModelAsync(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken); diff --git a/src/NATS.Client.Core/NatsConnection.Util.cs b/src/NATS.Client.Core/NatsConnection.Util.cs index 7b2bfa74b..ee96ed95e 100644 --- a/src/NATS.Client.Core/NatsConnection.Util.cs +++ b/src/NATS.Client.Core/NatsConnection.Util.cs @@ -66,16 +66,18 @@ internal void PostDirectWrite(DirectWriteCommand command) } } - private async ValueTask EnqueueAndAwaitCommandAsync(IAsyncCommand command) + [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] + internal CancellationTimer GetCancellationTimer(CancellationToken cancellationToken, TimeSpan timeout = default) { - await EnqueueCommandAsync(command).ConfigureAwait(false); - await command.AsValueTask().ConfigureAwait(false); + if (timeout == default) + timeout = Options.CommandTimeout; + return _cancellationTimerPool.Start(timeout, cancellationToken); } - [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] - private CancellationTimer GetCommandTimer(CancellationToken cancellationToken) + private async ValueTask EnqueueAndAwaitCommandAsync(IAsyncCommand command) { - return _cancellationTimerPool.Start(Options.CommandTimeout, cancellationToken); + await EnqueueCommandAsync(command).ConfigureAwait(false); + await command.AsValueTask().ConfigureAwait(false); } [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index f1e2924a5..13144fbb1 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -25,7 +25,6 @@ public partial class NatsConnection : IAsyncDisposable, INatsConnection public Func<(string Host, int Port), ValueTask<(string Host, int Port)>>? OnConnectingAsync; internal readonly ConnectionStatsCounter Counter; // allow to call from external sources - internal readonly ReadOnlyMemory InboxPrefix; #pragma warning restore SA1401 private readonly object _gate = new object(); private readonly WriterState _writerState; @@ -71,10 +70,10 @@ public NatsConnection(NatsOptions options) _writerState = new WriterState(options); _commandWriter = _writerState.CommandBuffer.Writer; _subscriptionManager = new SubscriptionManager(this); - InboxPrefix = Encoding.ASCII.GetBytes($"{options.InboxPrefix}{Guid.NewGuid()}."); _logger = options.LoggerFactory.CreateLogger(); _clientOptions = new ClientOptions(Options); HeaderParser = new HeaderParser(options.HeaderEncoding); + InboxSubscriber = new InboxSubscriber(this, prefix1: options.InboxPrefix); } // events @@ -92,6 +91,10 @@ public NatsConnection(NatsOptions options) public HeaderParser HeaderParser { get; } + internal InboxSubscriber InboxSubscriber { get; } + + internal ObjectPool ObjectPool => _pool; + /// /// Connect socket and write CONNECT command to nats server. /// @@ -183,13 +186,13 @@ internal async ValueTask EnqueueAndAwaitCommandAsync(IAsyncCommand comm internal ValueTask PostPongAsync() { - return EnqueueCommandAsync(PongCommand.Create(_pool, GetCommandTimer(CancellationToken.None))); + return EnqueueCommandAsync(PongCommand.Create(_pool, GetCancellationTimer(CancellationToken.None))); } // called only internally internal ValueTask SubscribeCoreAsync(int sid, string subject, string? queueGroup, CancellationToken cancellationToken) { - var command = AsyncSubscribeCommand.Create(_pool, GetCommandTimer(cancellationToken), sid, subject, queueGroup); + var command = AsyncSubscribeCommand.Create(_pool, GetCancellationTimer(cancellationToken), sid, subject, queueGroup); return EnqueueAndAwaitCommandAsync(command); } @@ -379,15 +382,15 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect) // add CONNECT and PING command to priority lane _writerState.PriorityCommands.Clear(); - var connectCommand = AsyncConnectCommand.Create(_pool, _clientOptions, GetCommandTimer(CancellationToken.None)); + var connectCommand = AsyncConnectCommand.Create(_pool, _clientOptions, GetCancellationTimer(CancellationToken.None)); _writerState.PriorityCommands.Add(connectCommand); - _writerState.PriorityCommands.Add(PingCommand.Create(_pool, GetCommandTimer(CancellationToken.None))); + _writerState.PriorityCommands.Add(PingCommand.Create(_pool, GetCancellationTimer(CancellationToken.None))); if (reconnect) { // Add SUBSCRIBE command to priority lane var subscribeCommand = - AsyncSubscribeBatchCommand.Create(_pool, GetCommandTimer(CancellationToken.None), _subscriptionManager.GetExistingSubscriptions().ToArray()); + AsyncSubscribeBatchCommand.Create(_pool, GetCancellationTimer(CancellationToken.None), _subscriptionManager.GetExistingSubscriptions().ToArray()); _writerState.PriorityCommands.Add(subscribeCommand); } diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index e7bba8442..4d69207f1 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -10,6 +10,30 @@ public readonly record struct NatsMsg( ReadOnlyMemory Data, INatsConnection? Connection) { + internal static NatsMsg Build( + string subject, + string? replyTo, + in ReadOnlySequence? headersBuffer, + in ReadOnlySequence payloadBuffer, + INatsConnection? connection, + HeaderParser headerParser) + { + NatsHeaders? headers = null; + + if (headersBuffer != null) + { + headers = new NatsHeaders(); + if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) + { + throw new NatsException("Error parsing headers"); + } + + headers.SetReadOnly(); + } + + return new NatsMsg(subject, replyTo, headers, payloadBuffer.ToArray(), connection); + } + public ValueTask ReplyAsync(ReadOnlySequence data = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); @@ -44,6 +68,33 @@ public readonly record struct NatsMsg( T? Data, INatsConnection? Connection) { + internal static NatsMsg Build( + string subject, + string? replyTo, + in ReadOnlySequence? headersBuffer, + in ReadOnlySequence payloadBuffer, + INatsConnection? connection, + HeaderParser headerParser, + INatsSerializer serializer) + { + var data = serializer.Deserialize(payloadBuffer); + + NatsHeaders? headers = null; + + if (headersBuffer != null) + { + headers = new NatsHeaders(); + if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) + { + throw new NatsException("Error parsing headers"); + } + + headers.SetReadOnly(); + } + + return new NatsMsg(subject, replyTo, headers, data, connection); + } + public ValueTask ReplyAsync(TReply data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); diff --git a/src/NATS.Client.Core/NatsOptions.cs b/src/NATS.Client.Core/NatsOptions.cs index 2b1d8d195..e2b92dfb9 100644 --- a/src/NATS.Client.Core/NatsOptions.cs +++ b/src/NATS.Client.Core/NatsOptions.cs @@ -74,7 +74,7 @@ public sealed record NatsOptions WriterBufferSize: 65534, // 32767 ReaderBufferSize: 1048576, UseThreadPoolCallback: false, - InboxPrefix: "_INBOX.", + InboxPrefix: "_INBOX", NoRandomize: false, PingInterval: TimeSpan.FromMinutes(2), MaxPingOut: 2, diff --git a/src/NATS.Client.Core/NatsReplyHandle.cs b/src/NATS.Client.Core/NatsReplyHandle.cs index edab786d5..fd40f39ab 100644 --- a/src/NATS.Client.Core/NatsReplyHandle.cs +++ b/src/NATS.Client.Core/NatsReplyHandle.cs @@ -1,10 +1,26 @@ -using System.Text; - namespace NATS.Client.Core; +public readonly struct NatsReplyHandle : IAsyncDisposable +{ + private readonly NatsSubBase _sub; + private readonly Task _reader; + + internal NatsReplyHandle(NatsSubBase sub, Task reader) + { + _sub = sub; + _reader = reader; + } + + public async ValueTask DisposeAsync() + { + await _sub.DisposeAsync().ConfigureAwait(false); + await _reader.ConfigureAwait(false); + } +} + public static class NatReplyUtils { - public static async Task ReplyAsync(this INatsConnection nats, string subject, Func reply) + public static async Task ReplyAsync(this INatsConnection nats, string subject, Func reply) { var sub = await nats.SubscribeAsync(subject).ConfigureAwait(false); var reader = Task.Run(async () => @@ -25,55 +41,36 @@ public static async Task ReplyAsync(this I return new NatsReplyHandle(sub, reader); } - public static async Task RequestAsync(this INatsConnection nats, string subject, TRequest request) + public static async ValueTask RequestAsync(this NatsConnection nats, string subject, TRequest data, CancellationToken cancellationToken = default, TimeSpan timeout = default) { - var bs = ((NatsConnection)nats).InboxPrefix.ToArray(); // TODO: Fix inbox prefix - var replyTo = $"{Encoding.ASCII.GetString(bs)}{Guid.NewGuid():N}"; + var serializer = nats.Options.Serializer; + var inboxSubscriber = nats.InboxSubscriber; + await inboxSubscriber.EnsureStartedAsync().ConfigureAwait(false); - // TODO: Optimize by using connection wide inbox subscriber - var sub = await nats.SubscribeAsync(replyTo).ConfigureAwait(false); + if (!nats.ObjectPool.TryRent(out var wrapper)) + { + wrapper = new MsgWrapper(); + } - await nats.PublishAsync(subject, request, new NatsPubOpts { ReplyTo = replyTo }).ConfigureAwait(false); + var cancellationTimer = nats.GetCancellationTimer(cancellationToken, timeout); + wrapper.SetSerializer(serializer, cancellationTimer.Token); - return await Task.Run(async () => + var replyTo = inboxSubscriber.Register(wrapper); + try { - try - { - // TODO: Implement configurable request timeout - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) - { - return msg.Data; - } + await nats.PubModelAsync(subject, data, serializer, replyTo, cancellationToken: cancellationToken) + .ConfigureAwait(false); - throw new NatsException("Request-reply subscriber closed unexpectedly"); - } - catch (OperationCanceledException e) - { - throw new TimeoutException("Request-reply timed-out", e); - } - finally - { - await sub.DisposeAsync().ConfigureAwait(false); - } - }).ConfigureAwait(false); - } -} + var dataReply = await wrapper.MsgRetrieveAsync().ConfigureAwait(false); -public class NatsReplyHandle : IAsyncDisposable -{ - private readonly NatsSubBase _sub; - private readonly Task _reader; + nats.ObjectPool.Return(wrapper); + cancellationTimer.TryReturn(); - internal NatsReplyHandle(NatsSubBase sub, Task reader) - { - _sub = sub; - _reader = reader; - } - - public async ValueTask DisposeAsync() - { - await _sub.DisposeAsync().ConfigureAwait(false); - await _reader.ConfigureAwait(false); + return (TReply?)dataReply; + } + finally + { + inboxSubscriber.Unregister(replyTo); + } } } diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSub.cs index 5cbf58a15..43066b56c 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSub.cs @@ -57,19 +57,15 @@ public override ValueTask DisposeAsync() public override ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) { - NatsHeaders? natsHeaders = null; - if (headersBuffer != null) - { - natsHeaders = new NatsHeaders(); - if (!Connection.HeaderParser.ParseHeaders(new SequenceReader(headersBuffer.Value), natsHeaders)) - { - throw new NatsException("Error parsing headers"); - } - - natsHeaders.SetReadOnly(); - } - - return _msgs.Writer.WriteAsync(new NatsMsg(subject, replyTo, natsHeaders, payloadBuffer.ToArray(), Connection)); + var natsMsg = NatsMsg.Build( + subject, + replyTo, + headersBuffer, + payloadBuffer, + Connection, + Connection.HeaderParser); + + return _msgs.Writer.WriteAsync(natsMsg); } } @@ -98,21 +94,15 @@ public override ValueTask DisposeAsync() public override ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) { - var serializer = Serializer; - var data = serializer.Deserialize(payloadBuffer); - - NatsHeaders? natsHeaders = null; - if (headersBuffer != null) - { - natsHeaders = new NatsHeaders(); - if (!Connection.HeaderParser.ParseHeaders(new SequenceReader(headersBuffer.Value), natsHeaders)) - { - throw new NatsException("Error parsing headers"); - } - - natsHeaders.SetReadOnly(); - } - - return _msgs.Writer.WriteAsync(new NatsMsg(subject, replyTo, natsHeaders, data, Connection)); + var natsMsg = NatsMsg.Build( + subject, + replyTo, + headersBuffer, + payloadBuffer, + Connection, + Connection.HeaderParser, + Serializer); + + return _msgs.Writer.WriteAsync(natsMsg); } } diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs index 5275bed9d..086bcbfbc 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs @@ -153,30 +153,3 @@ public async Task UserCredentialAuthTest(string name, string serverConfig, NatsO await register; } } - -internal static class NatsMsgTestUtils -{ - internal static Task Register(this NatsSub? sub, Action> action) - { - if (sub == null) return Task.CompletedTask; - return Task.Run(async () => - { - await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) - { - action(natsMsg); - } - }); - } - - internal static Task Register(this NatsSub? sub, Action action) - { - if (sub == null) return Task.CompletedTask; - return Task.Run(async () => - { - await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) - { - action(natsMsg); - } - }); - } -} diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs index 6916c930d..81352f805 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs @@ -67,13 +67,13 @@ public async Task EncodingTest() var key = Guid.NewGuid().ToString(); - var actual = new List(); + var actual = new List(); var signalComplete = new WaitSignal(); var sub = await subConnection.SubscribeAsync(key); var register = sub.Register(x => { actual.Add(x.Data); - if (x.Data.Id == 30) + if (x.Data?.Id == 30) signalComplete.Pulse(); }); await subConnection.PingAsync(); // wait for subscribe complete @@ -138,9 +138,9 @@ await Retry.Until( // }); // timeout check - await Assert.ThrowsAsync(async () => + await Assert.ThrowsAsync(async () => { - await pubConnection.RequestAsync("foo", 10); + await pubConnection.RequestAsync("foo", 10, timeout: TimeSpan.FromSeconds(2)); }); } diff --git a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs new file mode 100644 index 000000000..3450a1268 --- /dev/null +++ b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs @@ -0,0 +1,26 @@ +namespace NATS.Client.Core.Tests; + +public class RequestReplyTest +{ + [Fact] + public async Task Simple_request_reply_test() + { + await using var server = new NatsServer(); + await using var nats = server.CreateClientConnection(); + + var sub = await nats.SubscribeAsync("foo"); + var reg = sub.Register(async msg => + { + await msg.ReplyAsync(msg.Data * 2); + }); + + for (int i = 0; i < 10; i++) + { + var rep = await nats.RequestAsync("foo", i); + Assert.Equal(i * 2, rep); + } + + await sub.DisposeAsync(); + await reg; + } +} diff --git a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs index fcaf5cd5a..7293d5005 100644 --- a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs +++ b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs @@ -61,6 +61,12 @@ async Task Isolator() await Retry.Until( "unsubscribe message received", () => proxy.ClientFrames.Count(f => f.Message.StartsWith("UNSUB")) == 1, - async () => await nats.PublishAsync("foo", 1)); + async () => + { + GC.Collect(); + await nats.PublishAsync("foo", 1); + }, + timeout: TimeSpan.FromSeconds(30), + retryDelay: TimeSpan.FromSeconds(.5)); } } diff --git a/tests/NATS.Client.Core.Tests/_Utils.cs b/tests/NATS.Client.Core.Tests/_Utils.cs index e1095c807..81070e933 100644 --- a/tests/NATS.Client.Core.Tests/_Utils.cs +++ b/tests/NATS.Client.Core.Tests/_Utils.cs @@ -43,3 +43,54 @@ public static void WaitForTcpPortToClose(int port) } } } + +internal static class NatsMsgTestUtils +{ + internal static Task Register(this NatsSub? sub, Action> action) + { + if (sub == null) return Task.CompletedTask; + return Task.Run(async () => + { + await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) + { + action(natsMsg); + } + }); + } + + internal static Task Register(this NatsSub? sub, Func, Task> action) + { + if (sub == null) return Task.CompletedTask; + return Task.Run(async () => + { + await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) + { + await action(natsMsg); + } + }); + } + + internal static Task Register(this NatsSub? sub, Action action) + { + if (sub == null) return Task.CompletedTask; + return Task.Run(async () => + { + await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) + { + action(natsMsg); + } + }); + } + + internal static Task Register(this NatsSub? sub, Func action) + { + if (sub == null) return Task.CompletedTask; + return Task.Run(async () => + { + await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) + { + await action(natsMsg); + } + }); + } +}