Skip to content

Commit

Permalink
Inbox subscriber implementation (#69)
Browse files Browse the repository at this point in the history
* Inbox subscriber implementation

* Removed old inbox code

* Test resilience
  • Loading branch information
mtmk committed Jun 30, 2023
1 parent 3d43304 commit 0b3cffd
Show file tree
Hide file tree
Showing 17 changed files with 445 additions and 150 deletions.
8 changes: 8 additions & 0 deletions src/NATS.Client.Core/INatsSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public interface INatsSerializer
int Serialize<T>(ICountableBufferWriter bufferWriter, T? value);

T? Deserialize<T>(in ReadOnlySequence<byte> buffer);

object? Deserialize(in ReadOnlySequence<byte> buffer, Type type);
}

public interface ICountableBufferWriter : IBufferWriter<byte>
Expand Down Expand Up @@ -63,6 +65,12 @@ public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
return JsonSerializer.Deserialize<T>(ref reader, _options);
}

public object? Deserialize(in ReadOnlySequence<byte> buffer, Type type)
{
var reader = new Utf8JsonReader(buffer); // Utf8JsonReader is ref struct, no allocate.
return JsonSerializer.Deserialize(ref reader, type, _options);
}

private sealed class NullBufferWriter : IBufferWriter<byte>
{
internal static readonly IBufferWriter<byte> Instance = new NullBufferWriter();
Expand Down
203 changes: 203 additions & 0 deletions src/NATS.Client.Core/InboxSubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
using System.Buffers;
using System.Collections.Concurrent;
using System.Runtime.ExceptionServices;
using System.Threading.Tasks.Sources;
using Microsoft.Extensions.Logging;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;

internal class InboxSubscriber : INatsSubBuilder<InboxSub>, IAsyncDisposable
{
private readonly ILogger<InboxSubscriber> _logger;
private readonly string? _queueGroup;
private readonly ConcurrentDictionary<string, MsgWrapper> _writers = new();
private readonly string _prefix;
private InboxSub? _sub;
private bool _started;

public InboxSubscriber(
NatsConnection connection,
string? queueGroup = default,
string prefix1 = "_INBOX",
string? prefix2 = default)
{
_logger = connection.Options.LoggerFactory.CreateLogger<InboxSubscriber>();
_queueGroup = queueGroup;
prefix2 ??= Guid.NewGuid().ToString("N");
_prefix = $"{prefix1}.{prefix2}";
Connection = connection;
}

private NatsConnection Connection { get; }

public async ValueTask EnsureStartedAsync()
{
// Only one call to this method can start the subscription.
lock (this)
{
if (_started)
{
return;
}

_started = true;
}

try
{
_sub = await Connection.SubAsync($"{_prefix}.*", _queueGroup, this)
.ConfigureAwait(false);
}
catch
{
// There is a race here when there are two or more calls to this method and
// the first one fails to subscribe, other calls might carry on without exception.
// While first call would produce the correct exception, subsequent calls will only
// fail with a timeout. We reset here to allow retries to subscribe again.
lock (this) _started = false;
throw;
}
}

public InboxSub Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager)
{
var sid = manager.GetNextSid();
return new InboxSub(this, subject, queueGroup, sid, connection, manager);
}

public string Register(MsgWrapper msg, string? suffix = null)
{
suffix ??= Guid.NewGuid().ToString("N");
var subject = $"{_prefix}.{suffix}";
if (!_writers.TryAdd(subject, msg))
throw new InvalidOperationException("Subject already registered");
return subject;
}

public void Unregister(string subject) => _writers.TryRemove(subject, out _);

public void Received(string subject, string? replyTo, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
{
if (!_writers.TryGetValue(subject, out var msgWrapper))
{
_logger.LogWarning("Unregistered message inbox received");
return;
}

msgWrapper.MsgReceived(subject, replyTo, headersBuffer, payloadBuffer, connection);
}

public ValueTask DisposeAsync()
{
if (_sub != null)
return _sub.DisposeAsync();
return ValueTask.CompletedTask;
}
}

internal class InboxSub : INatsSub
{
private readonly InboxSubscriber _inbox;
private readonly NatsConnection _connection;
private readonly SubscriptionManager _manager;

public InboxSub(
InboxSubscriber inbox,
string subject,
string? queueGroup,
int sid,
NatsConnection connection,
SubscriptionManager manager)
{
_inbox = inbox;
_connection = connection;
_manager = manager;
Subject = subject;
QueueGroup = queueGroup;
Sid = sid;
}

public string Subject { get; }

public string? QueueGroup { get; }

public int Sid { get; }

public ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer)
{
_inbox.Received(subject, replyTo, headersBuffer, payloadBuffer, _connection);
return ValueTask.CompletedTask;
}

public ValueTask DisposeAsync() => _manager.RemoveAsync(Sid);
}

internal class MsgWrapper : IValueTaskSource<object?>, IObjectPoolNode<MsgWrapper>
{
private ManualResetValueTaskSourceCore<object?> _core = new()
{
RunContinuationsAsynchronously = true,
};

private INatsSerializer? _serializer;
private Type? _type;
private MsgWrapper? _next;
private CancellationTokenRegistration _ctr;

public ref MsgWrapper? NextNode => ref _next;

public void SetSerializer<TData>(INatsSerializer serializer, CancellationToken cancellationToken)
{
_serializer = serializer;
_type = typeof(TData);
_ctr = cancellationToken.UnsafeRegister(
static (msgWrapper, cancellationToken) => ((MsgWrapper)msgWrapper!).Cancel(cancellationToken), this);
}

public void MsgReceived(string subject, string? replyTo, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
{
if (_serializer == null || _type == null)
throw new NullReferenceException("Serializer must be set");
var data = _serializer.Deserialize(payloadBuffer, _type);
_core.SetResult(data);
}

public ValueTask<object?> MsgRetrieveAsync() => new(this, _core.Version);

public object? GetResult(short token)
{
_ctr.Dispose();
lock (this)
{
try
{
return _core.GetResult(token);
}
finally
{
_core.Reset();
_serializer = default;
_type = default;
_ctr = default;
}
}
}

public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token);

public void OnCompleted(
Action<object?> continuation,
object? state,
short token,
ValueTaskSourceOnCompletedFlags flags)
=> _core.OnCompleted(continuation, state, token, flags);

private void Cancel(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
_core.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new OperationCanceledException(cancellationToken)));
}
}
}
25 changes: 5 additions & 20 deletions src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private async Task ReadLoopAsync()
}

var msgHeader = buffer.Slice(0, positionBeforePayload.Value);
var (subject, sid, payloadLength, replyTo, reponseId) = ParseMessageHeader(msgHeader);
var (subject, sid, payloadLength, replyTo) = ParseMessageHeader(msgHeader);

if (payloadLength == 0)
{
Expand Down Expand Up @@ -432,7 +432,7 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R

// https://docs.nats.io/reference/reference-protocols/nats-protocol#msg
// MSG <subject> <sid> [reply-to] <#bytes>\r\n[payload]
private (string subject, int sid, int payloadLength, string? replyTo, int? responseId) ParseMessageHeader(ReadOnlySpan<byte> msgHeader)
private (string subject, int sid, int payloadLength, string? replyTo) ParseMessageHeader(ReadOnlySpan<byte> msgHeader)
{
msgHeader = msgHeader.Slice(4);
Split(msgHeader, out var subjectBytes, out msgHeader);
Expand All @@ -443,24 +443,9 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R

if (msgHeader.Length == 0)
{
int? responseId = null;

// Parse: _INBOX.RANDOM-GUID.ID
if (subjectBytes.StartsWith(_connection.InboxPrefix.Span))
{
var lastIndex = subjectBytes.LastIndexOf((byte)'.');
if (lastIndex != -1)
{
if (Utf8Parser.TryParse(subjectBytes.Slice(lastIndex + 1), out int id, out _))
{
responseId = id;
}
}
}

var sid = GetInt32(sidBytes);
var size = GetInt32(replyToOrSizeBytes);
return (subject, sid, size, null, responseId);
return (subject, sid, size, null);
}
else
{
Expand All @@ -470,11 +455,11 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
var sid = GetInt32(sidBytes);
var payloadLength = GetInt32(bytesSlice);
var replyTo = Encoding.ASCII.GetString(replyToBytes);
return (subject, sid, payloadLength, replyTo, null);
return (subject, sid, payloadLength, replyTo);
}
}

private (string subject, int sid, int payloadLength, string? replyTo, int? responseId) ParseMessageHeader(in ReadOnlySequence<byte> msgHeader)
private (string subject, int sid, int payloadLength, string? replyTo) ParseMessageHeader(in ReadOnlySequence<byte> msgHeader)
{
if (msgHeader.IsSingleSegment)
{
Expand Down
10 changes: 5 additions & 5 deletions src/NATS.Client.Core/NatsConnection.LowLevelApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ internal ValueTask PubAsync(string subject, string? replyTo = default, ReadOnlyS

if (ConnectionState == NatsConnectionState.Open)
{
var command = AsyncPublishBytesCommand.Create(_pool, GetCommandTimer(cancellationToken), subject, replyTo, headers, payload);
var command = AsyncPublishBytesCommand.Create(_pool, GetCancellationTimer(cancellationToken), subject, replyTo, headers, payload);
if (TryEnqueueCommand(command))
{
return command.AsValueTask();
Expand All @@ -25,19 +25,19 @@ internal ValueTask PubAsync(string subject, string? replyTo = default, ReadOnlyS
{
return WithConnectAsync(subject, replyTo, headers, payload, cancellationToken, static (self, s, r, h, p, token) =>
{
var command = AsyncPublishBytesCommand.Create(self._pool, self.GetCommandTimer(token), s, r, h, p);
var command = AsyncPublishBytesCommand.Create(self._pool, self.GetCancellationTimer(token), s, r, h, p);
return self.EnqueueAndAwaitCommandAsync(command);
});
}
}

internal ValueTask PubModelAsync<T>(string subject, T data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default)
internal ValueTask PubModelAsync<T>(string subject, T? data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default)
{
headers?.SetReadOnly();

if (ConnectionState == NatsConnectionState.Open)
{
var command = AsyncPublishCommand<T>.Create(_pool, GetCommandTimer(cancellationToken), subject, replyTo, headers, data, serializer);
var command = AsyncPublishCommand<T>.Create(_pool, GetCancellationTimer(cancellationToken), subject, replyTo, headers, data, serializer);
if (TryEnqueueCommand(command))
{
return command.AsValueTask();
Expand All @@ -51,7 +51,7 @@ internal ValueTask PubModelAsync<T>(string subject, T data, INatsSerializer seri
{
return WithConnectAsync(subject, replyTo, headers, data, serializer, cancellationToken, static (self, s, r, h, v, ser, token) =>
{
var command = AsyncPublishCommand<T>.Create(self._pool, self.GetCommandTimer(token), s, r, h, v, ser);
var command = AsyncPublishCommand<T>.Create(self._pool, self.GetCancellationTimer(token), s, r, h, v, ser);
return self.EnqueueAndAwaitCommandAsync(command);
});
}
Expand Down
8 changes: 4 additions & 4 deletions src/NATS.Client.Core/NatsConnection.Ping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public ValueTask<TimeSpan> PingAsync(CancellationToken cancellationToken = defau
{
if (ConnectionState == NatsConnectionState.Open)
{
var command = AsyncPingCommand.Create(this, _pool, GetCommandTimer(cancellationToken));
var command = AsyncPingCommand.Create(this, _pool, GetCancellationTimer(cancellationToken));
if (TryEnqueueCommand(command))
{
return command.AsValueTask();
Expand All @@ -23,7 +23,7 @@ public ValueTask<TimeSpan> PingAsync(CancellationToken cancellationToken = defau
{
return WithConnectAsync(cancellationToken, static (self, token) =>
{
var command = AsyncPingCommand.Create(self, self._pool, self.GetCommandTimer(token));
var command = AsyncPingCommand.Create(self, self._pool, self.GetCancellationTimer(token));
return self.EnqueueAndAwaitCommandAsync(command);
});
}
Expand All @@ -33,11 +33,11 @@ private void PostPing(CancellationToken cancellationToken = default)
{
if (ConnectionState == NatsConnectionState.Open)
{
EnqueueCommandSync(PingCommand.Create(_pool, GetCommandTimer(cancellationToken)));
EnqueueCommandSync(PingCommand.Create(_pool, GetCancellationTimer(cancellationToken)));
}
else
{
WithConnect(cancellationToken, static (self, token) => self.EnqueueCommandSync(PingCommand.Create(self._pool, self.GetCommandTimer(token))));
WithConnect(cancellationToken, static (self, token) => self.EnqueueCommandSync(PingCommand.Create(self._pool, self.GetCancellationTimer(token))));
}
}
}
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.Publish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ValueTask PublishAsync(NatsMsg msg, CancellationToken cancellationToken =
}

/// <inheritdoc />
public ValueTask PublishAsync<T>(string subject, T data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
public ValueTask PublishAsync<T>(string subject, T? data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
var serializer = opts?.Serializer ?? Options.Serializer;
return PubModelAsync<T>(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken);
Expand Down
14 changes: 8 additions & 6 deletions src/NATS.Client.Core/NatsConnection.Util.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,18 @@ internal void PostDirectWrite(DirectWriteCommand command)
}
}

private async ValueTask EnqueueAndAwaitCommandAsync(IAsyncCommand command)
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
internal CancellationTimer GetCancellationTimer(CancellationToken cancellationToken, TimeSpan timeout = default)
{
await EnqueueCommandAsync(command).ConfigureAwait(false);
await command.AsValueTask().ConfigureAwait(false);
if (timeout == default)
timeout = Options.CommandTimeout;
return _cancellationTimerPool.Start(timeout, cancellationToken);
}

[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
private CancellationTimer GetCommandTimer(CancellationToken cancellationToken)
private async ValueTask EnqueueAndAwaitCommandAsync(IAsyncCommand command)
{
return _cancellationTimerPool.Start(Options.CommandTimeout, cancellationToken);
await EnqueueCommandAsync(command).ConfigureAwait(false);
await command.AsValueTask().ConfigureAwait(false);
}

[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
Expand Down
Loading

0 comments on commit 0b3cffd

Please sign in to comment.