Skip to content

Commit

Permalink
Auto-unsubscribe implementations and fixes (#70)
Browse files Browse the repository at this point in the history
* Auto-unsubscribe on max-msgs, timeout and idle timeout
* Pass NatsMsg by ref to reduce copy overhead
* Hidden internal INatsSub API from public API using explicit
  interface implementations
* Test resilience
* Auto-unsubscribe reconnect test
  • Loading branch information
mtmk committed Jun 30, 2023
1 parent 0b3cffd commit 953692e
Show file tree
Hide file tree
Showing 17 changed files with 570 additions and 76 deletions.
3 changes: 2 additions & 1 deletion NATS.Client.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=CR/@EntryIndexedValue">CR</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=LF/@EntryIndexedValue">LF</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HMSG/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HPUB/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HPUB/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Msgs/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
10 changes: 9 additions & 1 deletion src/NATS.Client.Core/Commands/ProtocolWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void WritePublish<T>(string subject, string? replyTo, NatsHeaders? header

// https://docs.nats.io/reference/reference-protocols/nats-protocol#sub
// SUB <subject> [queue group] <sid>
public void WriteSubscribe(int sid, string subject, string? queueGroup)
public void WriteSubscribe(int sid, string subject, string? queueGroup, int? maxMsgs)
{
var offset = 0;

Expand Down Expand Up @@ -149,6 +149,14 @@ public void WriteSubscribe(int sid, string subject, string? queueGroup)
offset += CommandConstants.NewLine.Length;

_writer.Advance(offset);

// Immediately send UNSUB <sid> <max-msgs> to minimize the risk of
// receiving more messages than <max-msgs> in case they are published
// between our SUB and UNSUB calls.
if (maxMsgs != null)
{
WriteUnsubscribe(sid, maxMsgs);
}
}

// https://docs.nats.io/reference/reference-protocols/nats-protocol#unsub
Expand Down
14 changes: 8 additions & 6 deletions src/NATS.Client.Core/Commands/SubscribeCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ internal sealed class AsyncSubscribeCommand : AsyncCommandBase<AsyncSubscribeCom
private string? _subject;
private string? _queueGroup;
private int _sid;
private int? _maxMsgs;

private AsyncSubscribeCommand()
{
}

public static AsyncSubscribeCommand Create(ObjectPool pool, CancellationTimer timer, int sid, string subject, string? queueGroup)
public static AsyncSubscribeCommand Create(ObjectPool pool, CancellationTimer timer, int sid, string subject, string? queueGroup, int? maxMsgs)
{
if (!TryRent(pool, out var result))
{
Expand All @@ -22,14 +23,15 @@ public static AsyncSubscribeCommand Create(ObjectPool pool, CancellationTimer ti
result._subject = subject;
result._sid = sid;
result._queueGroup = queueGroup;
result._maxMsgs = maxMsgs;
result.SetCancellationTimer(timer);

return result;
}

public override void Write(ProtocolWriter writer)
{
writer.WriteSubscribe(_sid, _subject!, _queueGroup);
writer.WriteSubscribe(_sid, _subject!, _queueGroup, _maxMsgs);
}

protected override void Reset()
Expand All @@ -42,13 +44,13 @@ protected override void Reset()

internal sealed class AsyncSubscribeBatchCommand : AsyncCommandBase<AsyncSubscribeBatchCommand>, IBatchCommand
{
private (int sid, string subject, string? queueGroup)[]? _subscriptions;
private (int sid, string subject, string? queueGroup, int? maxMsgs)[]? _subscriptions;

private AsyncSubscribeBatchCommand()
{
}

public static AsyncSubscribeBatchCommand Create(ObjectPool pool, CancellationTimer timer, (int sid, string subject, string? queueGroup)[]? subscriptions)
public static AsyncSubscribeBatchCommand Create(ObjectPool pool, CancellationTimer timer, (int sid, string subject, string? queueGroup, int? maxMsgs)[]? subscriptions)
{
if (!TryRent(pool, out var result))
{
Expand All @@ -71,10 +73,10 @@ int IBatchCommand.Write(ProtocolWriter writer)
var i = 0;
if (_subscriptions != null)
{
foreach (var (id, subject, queue) in _subscriptions)
foreach (var (id, subject, queue, maxMsgs) in _subscriptions)
{
i++;
writer.WriteSubscribe(id, subject, queue);
writer.WriteSubscribe(id, subject, queue, maxMsgs);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public interface INatsConnection
/// <param name="msg">A <see cref="NatsMsg"/> representing message details.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync(NatsMsg msg, CancellationToken cancellationToken = default);
ValueTask PublishAsync(in NatsMsg msg, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
Expand All @@ -47,7 +47,7 @@ public interface INatsConnection
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be send to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(NatsMsg<T> msg, CancellationToken cancellationToken = default);
ValueTask PublishAsync<T>(in NatsMsg<T> msg, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
Expand Down
20 changes: 14 additions & 6 deletions src/NATS.Client.Core/INatsSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,33 @@ internal interface INatsSub : IAsyncDisposable

string? QueueGroup { get; }

int? PendingMsgs { get; }

int Sid { get; }

ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer);
/// <summary>
/// Called after subscription is sent to the server.
/// Helps maintain more accurate timeouts, especially idle timeout.
/// </summary>
void Ready();

ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer);
}

internal interface INatsSubBuilder<out T>
where T : INatsSub
{
T Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager);
T Build(string subject, NatsSubOpts? opts, NatsConnection connection, SubscriptionManager manager);
}

internal class NatsSubBuilder : INatsSubBuilder<NatsSub>
{
public static readonly NatsSubBuilder Default = new();

public NatsSub Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager)
public NatsSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, SubscriptionManager manager)
{
var sid = manager.GetNextSid();
return new NatsSub(connection, manager, subject, queueGroup, sid);
return new NatsSub(connection, manager, subject, opts, sid);
}
}

Expand All @@ -41,9 +49,9 @@ internal class NatsSubModelBuilder<T> : INatsSubBuilder<NatsSub<T>>
public static NatsSubModelBuilder<T> For(INatsSerializer serializer) =>
Cache.GetOrAdd(serializer, static s => new NatsSubModelBuilder<T>(s));

public NatsSub<T> Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager)
public NatsSub<T> Build(string subject, NatsSubOpts? opts, NatsConnection connection, SubscriptionManager manager)
{
var sid = manager.GetNextSid();
return new NatsSub<T>(connection, manager, subject, queueGroup, sid, _serializer);
return new NatsSub<T>(connection, manager, subject, opts, sid, _serializer);
}
}
19 changes: 13 additions & 6 deletions src/NATS.Client.Core/InboxSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async ValueTask EnsureStartedAsync()

try
{
_sub = await Connection.SubAsync($"{_prefix}.*", _queueGroup, this)
_sub = await Connection.SubAsync($"{_prefix}.*", new NatsSubOpts { QueueGroup = _queueGroup }, builder: this)
.ConfigureAwait(false);
}
catch
Expand All @@ -60,10 +60,10 @@ public async ValueTask EnsureStartedAsync()
}
}

public InboxSub Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager)
public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, SubscriptionManager manager)
{
var sid = manager.GetNextSid();
return new InboxSub(this, subject, queueGroup, sid, connection, manager);
return new InboxSub(this, subject, opts, sid, connection, manager);
}

public string Register(MsgWrapper msg, string? suffix = null)
Expand Down Expand Up @@ -105,7 +105,7 @@ internal class InboxSub : INatsSub
public InboxSub(
InboxSubscriber inbox,
string subject,
string? queueGroup,
NatsSubOpts? opts,
int sid,
NatsConnection connection,
SubscriptionManager manager)
Expand All @@ -114,17 +114,24 @@ public InboxSub(
_connection = connection;
_manager = manager;
Subject = subject;
QueueGroup = queueGroup;
QueueGroup = opts?.QueueGroup;
PendingMsgs = opts?.MaxMsgs;
Sid = sid;
}

public string Subject { get; }

public string? QueueGroup { get; }

public int? PendingMsgs { get; }

public int Sid { get; }

public ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer)
public void Ready()
{
}

public ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
{
_inbox.Received(subject, replyTo, headersBuffer, payloadBuffer, _connection);
return ValueTask.CompletedTask;
Expand Down
10 changes: 5 additions & 5 deletions src/NATS.Client.Core/NatsConnection.LowLevelApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,19 @@ internal ValueTask PubModelAsync<T>(string subject, T? data, INatsSerializer ser
}
}

internal ValueTask<T> SubAsync<T>(string subject, string? queueGroup, INatsSubBuilder<T> builder, CancellationToken cancellationToken = default)
internal ValueTask<T> SubAsync<T>(string subject, NatsSubOpts? opts, INatsSubBuilder<T> builder, CancellationToken cancellationToken = default)
where T : INatsSub
{
var sub = builder.Build(subject, queueGroup, this, _subscriptionManager);
var sub = builder.Build(subject, opts, connection: this, _subscriptionManager);
if (ConnectionState == NatsConnectionState.Open)
{
return _subscriptionManager.SubscribeAsync(subject, queueGroup, sub, cancellationToken);
return _subscriptionManager.SubscribeAsync(subject, opts, sub, cancellationToken);
}
else
{
return WithConnectAsync(subject, queueGroup, sub, cancellationToken, static (self, s, qg, sb, token) =>
return WithConnectAsync(subject, opts, sub, cancellationToken, static (self, s, o, sb, token) =>
{
return self._subscriptionManager.SubscribeAsync(s, qg, sb, token);
return self._subscriptionManager.SubscribeAsync(s, o, sb, token);
});
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/NatsConnection.Publish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = d
}

/// <inheritdoc />
public ValueTask PublishAsync(NatsMsg msg, CancellationToken cancellationToken = default)
public ValueTask PublishAsync(in NatsMsg msg, CancellationToken cancellationToken = default)
{
return PublishAsync(msg.Subject, msg.Data, default, cancellationToken);
}
Expand All @@ -24,7 +24,7 @@ public ValueTask PublishAsync<T>(string subject, T? data, in NatsPubOpts? opts =
}

/// <inheritdoc />
public ValueTask PublishAsync<T>(NatsMsg<T> msg, CancellationToken cancellationToken = default)
public ValueTask PublishAsync<T>(in NatsMsg<T> msg, CancellationToken cancellationToken = default)
{
return PublishAsync<T>(msg.Subject, msg.Data, default, cancellationToken);
}
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/NatsConnection.Subscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ public partial class NatsConnection
/// <inheritdoc />
public ValueTask<NatsSub> SubscribeAsync(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
{
return SubAsync<NatsSub>(subject, opts?.QueueGroup, NatsSubBuilder.Default, cancellationToken);
return SubAsync<NatsSub>(subject, opts, NatsSubBuilder.Default, cancellationToken);
}

/// <inheritdoc />
public ValueTask<NatsSub<T>> SubscribeAsync<T>(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
{
var serializer = opts?.Serializer ?? Options.Serializer;
return SubAsync<NatsSub<T>>(subject, opts?.QueueGroup, NatsSubModelBuilder<T>.For(serializer), cancellationToken);
return SubAsync<NatsSub<T>>(subject, opts, NatsSubModelBuilder<T>.For(serializer), cancellationToken);
}
}
10 changes: 8 additions & 2 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ internal ValueTask PostPongAsync()
}

// called only internally
internal ValueTask SubscribeCoreAsync(int sid, string subject, string? queueGroup, CancellationToken cancellationToken)
internal ValueTask SubscribeCoreAsync(int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken)
{
var command = AsyncSubscribeCommand.Create(_pool, GetCancellationTimer(cancellationToken), sid, subject, queueGroup);
var command = AsyncSubscribeCommand.Create(_pool, GetCancellationTimer(cancellationToken), sid, subject, queueGroup, maxMsgs);
return EnqueueAndAwaitCommandAsync(command);
}

Expand Down Expand Up @@ -777,6 +777,12 @@ private async ValueTask<TResult> WithConnectAsync<T1, T2, T3, T4, TResult>(T1 it
await ConnectAsync().ConfigureAwait(false);
return await coreAsync(this, item1, item2, item3, item4).ConfigureAwait(false);
}

private async ValueTask<TResult> WithConnectAsync<T1, T2, T3, T4, T5, TResult>(T1 item1, T2 item2, T3 item3, T4 item4, T5 item5, Func<NatsConnection, T1, T2, T3, T4, T5, ValueTask<TResult>> coreAsync)
{
await ConnectAsync().ConfigureAwait(false);
return await coreAsync(this, item1, item2, item3, item4, item5).ConfigureAwait(false);
}
}

// This writer state is reused when reconnecting.
Expand Down
Loading

0 comments on commit 953692e

Please sign in to comment.