Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-unsubscribe implementations and fixes #70

Merged
merged 5 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion NATS.Client.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=CR/@EntryIndexedValue">CR</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=LF/@EntryIndexedValue">LF</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HMSG/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HPUB/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HPUB/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Msgs/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
10 changes: 9 additions & 1 deletion src/NATS.Client.Core/Commands/ProtocolWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void WritePublish<T>(string subject, string? replyTo, NatsHeaders? header

// https://docs.nats.io/reference/reference-protocols/nats-protocol#sub
// SUB <subject> [queue group] <sid>
public void WriteSubscribe(int sid, string subject, string? queueGroup)
public void WriteSubscribe(int sid, string subject, string? queueGroup, int? maxMsgs)
{
var offset = 0;

Expand Down Expand Up @@ -149,6 +149,14 @@ public void WriteSubscribe(int sid, string subject, string? queueGroup)
offset += CommandConstants.NewLine.Length;

_writer.Advance(offset);

// Immediately send UNSUB <sid> <max-msgs> to minimize the risk of
// receiving more messages than <max-msgs> in case they are published
// between our SUB and UNSUB calls.
if (maxMsgs != null)
{
WriteUnsubscribe(sid, maxMsgs);
}
}

// https://docs.nats.io/reference/reference-protocols/nats-protocol#unsub
Expand Down
14 changes: 8 additions & 6 deletions src/NATS.Client.Core/Commands/SubscribeCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ internal sealed class AsyncSubscribeCommand : AsyncCommandBase<AsyncSubscribeCom
private string? _subject;
private string? _queueGroup;
private int _sid;
private int? _maxMsgs;

private AsyncSubscribeCommand()
{
}

public static AsyncSubscribeCommand Create(ObjectPool pool, CancellationTimer timer, int sid, string subject, string? queueGroup)
public static AsyncSubscribeCommand Create(ObjectPool pool, CancellationTimer timer, int sid, string subject, string? queueGroup, int? maxMsgs)
{
if (!TryRent(pool, out var result))
{
Expand All @@ -22,14 +23,15 @@ public static AsyncSubscribeCommand Create(ObjectPool pool, CancellationTimer ti
result._subject = subject;
result._sid = sid;
result._queueGroup = queueGroup;
result._maxMsgs = maxMsgs;
result.SetCancellationTimer(timer);

return result;
}

public override void Write(ProtocolWriter writer)
{
writer.WriteSubscribe(_sid, _subject!, _queueGroup);
writer.WriteSubscribe(_sid, _subject!, _queueGroup, _maxMsgs);
}

protected override void Reset()
Expand All @@ -42,13 +44,13 @@ protected override void Reset()

internal sealed class AsyncSubscribeBatchCommand : AsyncCommandBase<AsyncSubscribeBatchCommand>, IBatchCommand
{
private (int sid, string subject, string? queueGroup)[]? _subscriptions;
private (int sid, string subject, string? queueGroup, int? maxMsgs)[]? _subscriptions;

private AsyncSubscribeBatchCommand()
{
}

public static AsyncSubscribeBatchCommand Create(ObjectPool pool, CancellationTimer timer, (int sid, string subject, string? queueGroup)[]? subscriptions)
public static AsyncSubscribeBatchCommand Create(ObjectPool pool, CancellationTimer timer, (int sid, string subject, string? queueGroup, int? maxMsgs)[]? subscriptions)
{
if (!TryRent(pool, out var result))
{
Expand All @@ -71,10 +73,10 @@ int IBatchCommand.Write(ProtocolWriter writer)
var i = 0;
if (_subscriptions != null)
{
foreach (var (id, subject, queue) in _subscriptions)
foreach (var (id, subject, queue, maxMsgs) in _subscriptions)
{
i++;
writer.WriteSubscribe(id, subject, queue);
writer.WriteSubscribe(id, subject, queue, maxMsgs);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public interface INatsConnection
/// <param name="msg">A <see cref="NatsMsg"/> representing message details.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync(NatsMsg msg, CancellationToken cancellationToken = default);
ValueTask PublishAsync(in NatsMsg msg, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
Expand All @@ -47,7 +47,7 @@ public interface INatsConnection
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be send to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(NatsMsg<T> msg, CancellationToken cancellationToken = default);
ValueTask PublishAsync<T>(in NatsMsg<T> msg, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
Expand Down
20 changes: 14 additions & 6 deletions src/NATS.Client.Core/INatsSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,33 @@ internal interface INatsSub : IAsyncDisposable

string? QueueGroup { get; }

int? PendingMsgs { get; }

int Sid { get; }

ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer);
/// <summary>
/// Called after subscription is sent to the server.
/// Helps maintain more accurate timeouts, especially idle timeout.
/// </summary>
void Ready();

ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer);
}

internal interface INatsSubBuilder<out T>
where T : INatsSub
{
T Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager);
T Build(string subject, NatsSubOpts? opts, NatsConnection connection, SubscriptionManager manager);
}

internal class NatsSubBuilder : INatsSubBuilder<NatsSub>
{
public static readonly NatsSubBuilder Default = new();

public NatsSub Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager)
public NatsSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, SubscriptionManager manager)
{
var sid = manager.GetNextSid();
return new NatsSub(connection, manager, subject, queueGroup, sid);
return new NatsSub(connection, manager, subject, opts, sid);
}
}

Expand All @@ -41,9 +49,9 @@ internal class NatsSubModelBuilder<T> : INatsSubBuilder<NatsSub<T>>
public static NatsSubModelBuilder<T> For(INatsSerializer serializer) =>
Cache.GetOrAdd(serializer, static s => new NatsSubModelBuilder<T>(s));

public NatsSub<T> Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager)
public NatsSub<T> Build(string subject, NatsSubOpts? opts, NatsConnection connection, SubscriptionManager manager)
{
var sid = manager.GetNextSid();
return new NatsSub<T>(connection, manager, subject, queueGroup, sid, _serializer);
return new NatsSub<T>(connection, manager, subject, opts, sid, _serializer);
}
}
19 changes: 13 additions & 6 deletions src/NATS.Client.Core/InboxSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async ValueTask EnsureStartedAsync()

try
{
_sub = await Connection.SubAsync($"{_prefix}.*", _queueGroup, this)
_sub = await Connection.SubAsync($"{_prefix}.*", new NatsSubOpts { QueueGroup = _queueGroup }, builder: this)
.ConfigureAwait(false);
}
catch
Expand All @@ -60,10 +60,10 @@ public async ValueTask EnsureStartedAsync()
}
}

public InboxSub Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager)
public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, SubscriptionManager manager)
{
var sid = manager.GetNextSid();
return new InboxSub(this, subject, queueGroup, sid, connection, manager);
return new InboxSub(this, subject, opts, sid, connection, manager);
}

public string Register(MsgWrapper msg, string? suffix = null)
Expand Down Expand Up @@ -105,7 +105,7 @@ internal class InboxSub : INatsSub
public InboxSub(
InboxSubscriber inbox,
string subject,
string? queueGroup,
NatsSubOpts? opts,
int sid,
NatsConnection connection,
SubscriptionManager manager)
Expand All @@ -114,17 +114,24 @@ public InboxSub(
_connection = connection;
_manager = manager;
Subject = subject;
QueueGroup = queueGroup;
QueueGroup = opts?.QueueGroup;
PendingMsgs = opts?.MaxMsgs;
Sid = sid;
}

public string Subject { get; }

public string? QueueGroup { get; }

public int? PendingMsgs { get; }

public int Sid { get; }

public ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer)
public void Ready()
{
}

public ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
{
_inbox.Received(subject, replyTo, headersBuffer, payloadBuffer, _connection);
return ValueTask.CompletedTask;
Expand Down
10 changes: 5 additions & 5 deletions src/NATS.Client.Core/NatsConnection.LowLevelApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,19 @@ internal ValueTask PubModelAsync<T>(string subject, T? data, INatsSerializer ser
}
}

internal ValueTask<T> SubAsync<T>(string subject, string? queueGroup, INatsSubBuilder<T> builder, CancellationToken cancellationToken = default)
internal ValueTask<T> SubAsync<T>(string subject, NatsSubOpts? opts, INatsSubBuilder<T> builder, CancellationToken cancellationToken = default)
where T : INatsSub
{
var sub = builder.Build(subject, queueGroup, this, _subscriptionManager);
var sub = builder.Build(subject, opts, connection: this, _subscriptionManager);
if (ConnectionState == NatsConnectionState.Open)
{
return _subscriptionManager.SubscribeAsync(subject, queueGroup, sub, cancellationToken);
return _subscriptionManager.SubscribeAsync(subject, opts, sub, cancellationToken);
}
else
{
return WithConnectAsync(subject, queueGroup, sub, cancellationToken, static (self, s, qg, sb, token) =>
return WithConnectAsync(subject, opts, sub, cancellationToken, static (self, s, o, sb, token) =>
{
return self._subscriptionManager.SubscribeAsync(s, qg, sb, token);
return self._subscriptionManager.SubscribeAsync(s, o, sb, token);
});
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/NatsConnection.Publish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = d
}

/// <inheritdoc />
public ValueTask PublishAsync(NatsMsg msg, CancellationToken cancellationToken = default)
public ValueTask PublishAsync(in NatsMsg msg, CancellationToken cancellationToken = default)
{
return PublishAsync(msg.Subject, msg.Data, default, cancellationToken);
}
Expand All @@ -24,7 +24,7 @@ public ValueTask PublishAsync<T>(string subject, T? data, in NatsPubOpts? opts =
}

/// <inheritdoc />
public ValueTask PublishAsync<T>(NatsMsg<T> msg, CancellationToken cancellationToken = default)
public ValueTask PublishAsync<T>(in NatsMsg<T> msg, CancellationToken cancellationToken = default)
{
return PublishAsync<T>(msg.Subject, msg.Data, default, cancellationToken);
}
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/NatsConnection.Subscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ public partial class NatsConnection
/// <inheritdoc />
public ValueTask<NatsSub> SubscribeAsync(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
{
return SubAsync<NatsSub>(subject, opts?.QueueGroup, NatsSubBuilder.Default, cancellationToken);
return SubAsync<NatsSub>(subject, opts, NatsSubBuilder.Default, cancellationToken);
}

/// <inheritdoc />
public ValueTask<NatsSub<T>> SubscribeAsync<T>(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
{
var serializer = opts?.Serializer ?? Options.Serializer;
return SubAsync<NatsSub<T>>(subject, opts?.QueueGroup, NatsSubModelBuilder<T>.For(serializer), cancellationToken);
return SubAsync<NatsSub<T>>(subject, opts, NatsSubModelBuilder<T>.For(serializer), cancellationToken);
}
}
10 changes: 8 additions & 2 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ internal ValueTask PostPongAsync()
}

// called only internally
internal ValueTask SubscribeCoreAsync(int sid, string subject, string? queueGroup, CancellationToken cancellationToken)
internal ValueTask SubscribeCoreAsync(int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken)
{
var command = AsyncSubscribeCommand.Create(_pool, GetCancellationTimer(cancellationToken), sid, subject, queueGroup);
var command = AsyncSubscribeCommand.Create(_pool, GetCancellationTimer(cancellationToken), sid, subject, queueGroup, maxMsgs);
return EnqueueAndAwaitCommandAsync(command);
}

Expand Down Expand Up @@ -777,6 +777,12 @@ private async ValueTask<TResult> WithConnectAsync<T1, T2, T3, T4, TResult>(T1 it
await ConnectAsync().ConfigureAwait(false);
return await coreAsync(this, item1, item2, item3, item4).ConfigureAwait(false);
}

private async ValueTask<TResult> WithConnectAsync<T1, T2, T3, T4, T5, TResult>(T1 item1, T2 item2, T3 item3, T4 item4, T5 item5, Func<NatsConnection, T1, T2, T3, T4, T5, ValueTask<TResult>> coreAsync)
{
await ConnectAsync().ConfigureAwait(false);
return await coreAsync(this, item1, item2, item3, item4, item5).ConfigureAwait(false);
}
}

// This writer state is reused when reconnecting.
Expand Down
Loading