Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed Reply extension method #77

Merged
merged 3 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions docs/documentation/queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Queue Groups

When subscribers register themselves to receive messages from a publisher,
the 1:N fan-out pattern of messaging ensures that any message sent by a publisher,
reaches all subscribers that have registered. NATS provides an additional feature
named "queue", which allows subscribers to register themselves as part of a queue.
Subscribers that are part of a queue, form the "queue group".

Code below demonstrates multiple subscriptions on the same queue group,
receiving messages randomly distributed among them. This example also shows
how queue groups can be used to load balance responders:

```csharp
using NATS.Client.Core;

await using var nats = new NatsConnection();

var subs = new List<NatsSubBase>();
var replyTasks = new List<Task>();

for (int i = 0; i < 3; i++)
{
// Create three subscriptions all on the same queue group
var opts = new NatsSubOpts { QueueGroup = "maths-service" };
var sub = await nats.SubscribeAsync<int>("math.double", opts);

subs.Add(sub);

// Create a background message loop for every subscription
var replyTaskId = i;
replyTasks.Add(Task.Run(async () =>
{
// Retrieve messages until unsubscribed
await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Console.WriteLine($"[{replyTaskId}] Received request: {msg.Data}");
await msg.ReplyAsync($"Answer is: {2 * msg.Data}");
}

Console.WriteLine($"[{replyTaskId}] Done");
}));
}

// Send a few requests
for (int i = 0; i < 10; i++)
{
var reply = await nats.RequestAsync<int, string>("math.double", i);
Console.WriteLine($"Reply: '{reply}'");
}

Console.WriteLine("Stopping...");

// Unsubscribing or disposing will complete the message loops
foreach (var sub in subs)
await sub.UnsubscribeAsync();

// Make sure all tasks finished cleanly
await Task.WhenAll(replyTasks);

Console.WriteLine("Bye");
```

Output should look similar to this:

```
[0] Received request: 0
Reply: 'Answer is: 0'
[2] Received request: 1
Reply: 'Answer is: 2'
[1] Received request: 2
Reply: 'Answer is: 4'
[0] Received request: 3
Reply: 'Answer is: 6'
[0] Received request: 4
Reply: 'Answer is: 8'
[1] Received request: 5
Reply: 'Answer is: 10'
[2] Received request: 6
Reply: 'Answer is: 12'
[0] Received request: 7
Reply: 'Answer is: 14'
[1] Received request: 8
Reply: 'Answer is: 16'
[0] Received request: 9
Reply: 'Answer is: 18'
Stopping...
[0] Done
[1] Done
[2] Done
Bye
```
19 changes: 15 additions & 4 deletions docs/documentation/req-rep.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,26 @@ Request-Reply is a common pattern in modern distributed systems.
A request is sent, and the application either waits on the response with a certain timeout,
or receives a response asynchronously.

Create a service that will be responding to requests:
```csharp
await using var nats = new NatsConnection();

await using var replyHandle = await nats.ReplyAsync<int, string>("math.double", x =>
await using var sub = await conn.SubscribeAsync<int>("math.double");

await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Console.WriteLine($"Received request: {x}")
return $"Answer is: { 2 * x }";
});
Console.WriteLine($"Received request: {msg.Data}");

await msg.ReplyAsync($"Answer is: { 2 * msg.Data }");
}
```

Reply to a request is asynchronously received using an _inbox_ subscription
behind the scenes:
```csharp
await using var nats = new NatsConnection();

var reply = await nats.RequestAsync<int, string>("math.double", 2);

Console.WriteLine($"Received reply: {reply}")
```
7 changes: 5 additions & 2 deletions docs/documentation/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

- name: Request-Reply
href: req-rep.md


- name: Queue Groups
href: queue.md

- name: Updating Documentation
href: update-docs.md
href: update-docs.md
37 changes: 25 additions & 12 deletions sandbox/BlazorWasm/Server/NatsServices/WeatherForecastService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ public class WeatherForecastService : IHostedService, IAsyncDisposable

private readonly ILogger<WeatherForecastService> _logger;
private readonly INatsConnection _natsConnection;
private NatsReplyHandle? _replyHandle;
private NatsSub<object>? _replySubscription;
private Task? _replyTask;

public WeatherForecastService(ILogger<WeatherForecastService> logger, INatsConnection natsConnection)
{
Expand All @@ -22,23 +23,35 @@ public WeatherForecastService(ILogger<WeatherForecastService> logger, INatsConne

public async Task StartAsync(CancellationToken cancellationToken)
{
_replyHandle = await _natsConnection.ReplyAsync<object, WeatherForecast[]>("weather", req =>
{
return Enumerable.Range(1, 5).Select(index => new WeatherForecast
_replySubscription = await _natsConnection.SubscribeAsync<object>("weather", cancellationToken: cancellationToken);
_replyTask = Task.Run(
async () =>
{
Date = DateTime.Now.AddDays(index),
TemperatureC = Random.Shared.Next(-20, 55),
Summary = Summaries[Random.Shared.Next(Summaries.Length)],
}).ToArray();
});
await foreach (var msg in _replySubscription.Msgs.ReadAllAsync(cancellationToken))
{
var forecasts = Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
Date = DateTime.Now.AddDays(index),
TemperatureC = Random.Shared.Next(-20, 55),
Summary = Summaries[Random.Shared.Next(Summaries.Length)],
}).ToArray();
await msg.ReplyAsync(forecasts, cancellationToken: cancellationToken);
}
},
cancellationToken);
_logger.LogInformation("Weather Forecast Services is running");
}

public Task StopAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Weather Forecast Services is stopping");
return Task.CompletedTask;
if (_replySubscription != null) await _replySubscription.UnsubscribeAsync();
if (_replyTask != null) await _replyTask;
}

public ValueTask DisposeAsync() => _replyHandle?.DisposeAsync() ?? ValueTask.CompletedTask;
public async ValueTask DisposeAsync()
{
if (_replySubscription != null) await _replySubscription.DisposeAsync();
if (_replyTask != null) await _replyTask;
}
}
12 changes: 11 additions & 1 deletion sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,21 @@
};

// Server
await conn.ReplyAsync("foobar", (int x) => $"Hello {x}");
var sub = await conn.SubscribeAsync<int>("foobar");
var replyTask = Task.Run(async () =>
{
await foreach (var msg in sub.Msgs.ReadAllAsync())
{
await msg.ReplyAsync($"Hello {msg.Data}");
}
});

// Client(response: "Hello 100")
var response = await conn.RequestAsync<int, string>("foobar", 100);

await sub.UnsubscribeAsync();
await replyTask;

// subscribe
var subscription = await conn.SubscribeAsync<Person>("foo");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,8 @@

namespace NATS.Client.Core;

public readonly struct NatsReplyHandle : IAsyncDisposable
public static class NatsRequestExtensions
{
private readonly NatsSubBase _sub;
private readonly Task _reader;

internal NatsReplyHandle(NatsSubBase sub, Task reader)
{
_sub = sub;
_reader = reader;
}

public async ValueTask DisposeAsync()
{
await _sub.DisposeAsync().ConfigureAwait(false);
await _reader.ConfigureAwait(false);
}
}

public static class NatReplyUtils
{
/// <summary>
/// Create a responder using the NATS Request-Reply pattern, with a single response.
/// </summary>
/// <param name="nats">NATS connection</param>
/// <param name="subject">Subject to subscribed to</param>
/// <param name="reply">Callback to prepare replies to incoming requests. Exceptions will be handled and a default response will be sent. You should implement your own exception handling.</param>
/// <typeparam name="TRequest">Incoming request type</typeparam>
/// <typeparam name="TResponse">Reply or response type to be sent to requesters</typeparam>
/// <returns>A disposable handler to keep track of subscription. Dispose to unsubscribe and wait for reply callback to exit.</returns>
public static async Task<NatsReplyHandle> ReplyAsync<TRequest, TResponse>(this INatsConnection nats, string subject, Func<TRequest?, TResponse> reply)
{
var sub = await nats.SubscribeAsync<TRequest>(subject).ConfigureAwait(false);
var reader = Task.Run(async () =>
{
await foreach (var msg in sub.Msgs.ReadAllAsync())
{
try
{
var response = reply(msg.Data);
await msg.ReplyAsync(response).ConfigureAwait(false);
}
catch
{
await msg.ReplyAsync(default(TResponse)).ConfigureAwait(false);
}
}
});
return new NatsReplyHandle(sub, reader);
}

/// <summary>
/// Request data using the NATSRequest-Reply pattern with a single response.
/// </summary>
Expand Down
22 changes: 15 additions & 7 deletions tests/NATS.Client.Core.Tests/NatsConnectionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,22 @@ public async Task RequestTest(int minSize)
var text = new StringBuilder(minSize).Insert(0, "a", minSize).ToString();

var sync = 0;
await using var replyHandle = await subConnection.ReplyAsync<int, string>(subject, x =>
var sub = await subConnection.SubscribeAsync<int>(subject);
var reg = sub.Register(async m =>
{
if (x < 10)
if (m.Data < 10)
{
Interlocked.Exchange(ref sync, m.Data);
await m.ReplyAsync( "sync");
}

if (m.Data == 100)
{
Interlocked.Exchange(ref sync, x);
return "sync";
await m.ReplyAsync(default(string));
return;
}

if (x == 100)
throw new Exception();
return text + x;
await m.ReplyAsync(text + m.Data);
});

await Retry.Until(
Expand All @@ -139,6 +144,9 @@ await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
await pubConnection.RequestAsync<int, string>("foo", 10, timeout: TimeSpan.FromSeconds(2));
});

await sub.DisposeAsync();
await reg;
}

[Fact]
Expand Down