Skip to content

Commit

Permalink
Auto-unsubscribe reconnect test
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Jun 30, 2023
1 parent 21a3833 commit b4feb50
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 25 deletions.
71 changes: 59 additions & 12 deletions tests/NATS.Client.Core.Tests/ProtocolTest.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using System.Text.RegularExpressions;

namespace NATS.Client.Core.Tests;

public class ProtocolTest
Expand Down Expand Up @@ -169,7 +167,7 @@ public async Task Unsubscribe_max_msgs()
var opts = new NatsSubOpts { MaxMsgs = maxMsgs };
await using var sub = await nats.SubscribeAsync<int>("foo", opts);

var sid = ((INatsSub) sub).Sid;
var sid = ((INatsSub)sub).Sid;
await Retry.Until("all frames arrived", () => proxy.Frames.Count >= 2);
Assert.Equal($"SUB foo {sid}", proxy.Frames[0].Message);
Assert.Equal($"UNSUB {sid} {maxMsgs}", proxy.Frames[1].Message);
Expand All @@ -195,22 +193,18 @@ public async Task Unsubscribe_max_msgs()

// Manual unsubscribe
{
proxy.ClearFrames();

await using var sub = await nats.SubscribeAsync<int>("foo2");

await sub.UnsubscribeAsync();

var sid = ((INatsSub)sub).Sid;

await Retry.Until("all frames arrived", () => proxy.Frames.Count(f => f.Message == $"UNSUB {sid}") == 1);
await Retry.Until("all frames arrived", () => proxy.ClientFrames.Count >= 2);

// Frames we're interested in would be somewhere down the list
// since we ran other tests above.
var index = proxy.ClientFrames
.Select((f, i) => (frame: f, Index: i))
.Single(fi => fi.frame.Message == $"SUB foo2 {sid}")
.Index;
Assert.Matches($"SUB foo2 {sid}", proxy.ClientFrames[index].Message);
Assert.Matches($"UNSUB {sid}", proxy.ClientFrames[index + 1].Message);
Assert.Equal($"SUB foo2 {sid}", proxy.ClientFrames[0].Message);
Assert.Equal($"UNSUB {sid}", proxy.ClientFrames[1].Message);

// send messages to check we receive none since we're already unsubscribed
for (var i = 0; i < 100; i++)
Expand All @@ -231,5 +225,58 @@ public async Task Unsubscribe_max_msgs()
Assert.Equal(0, count);
Assert.Equal(NatsSubEndReason.None, sub.EndReason);
}

// Reconnect
{
proxy.Reset();

const int maxMsgs = 100;
const int pubMsgs = 10;
var opts = new NatsSubOpts { MaxMsgs = maxMsgs };
var sub = await nats.SubscribeAsync<int>("foo3", opts);
var count = 0;
var reg = sub.Register(_ => Interlocked.Increment(ref count));
var sid = ((INatsSub)sub).Sid;
await Retry.Until("subscribed", () => proxy.Frames.Any(f => f.Message == $"SUB foo3 {sid}"));

for (var i = 0; i < pubMsgs; i++)
{
await nats.PublishAsync("foo3", i);
}

await Retry.Until("published", () => proxy.Frames.Count(f => f.Message.StartsWith("PUB foo3")) == 10);
await Retry.Until("received", () => Volatile.Read(ref count) == 10);

var pending = maxMsgs - pubMsgs;
Assert.Equal(pending, ((INatsSub)sub).PendingMsgs);

proxy.Reset();

// SUB + UNSUB
await Retry.Until("re-subscribed", () => proxy.ClientFrames.Count == 2);

// Make sure we're still using the same SID
Assert.Equal($"SUB foo3 {sid}", proxy.ClientFrames[0].Message);
Assert.Equal($"UNSUB {sid} {pending}", proxy.ClientFrames[1].Message);

// We already published a few, this should exceed max-msgs
for (var i = 0; i < maxMsgs; i++)
{
await nats.PublishAsync("foo3", i);
}

await Retry.Until(
"published more",
() => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB foo3")) == maxMsgs);

await Retry.Until(
"unsubscribed with max-msgs",
() => sub.EndReason == NatsSubEndReason.MaxMsgs);

Assert.Equal(Volatile.Read(ref count), maxMsgs);

await sub.DisposeAsync();
await reg;
}
}
}
73 changes: 60 additions & 13 deletions tests/NATS.Client.Core.Tests/_NatsServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,31 +296,40 @@ public class NatsProxy : IDisposable
{
private readonly ITestOutputHelper _outputHelper;
private readonly TcpListener _tcpListener;
private readonly List<TcpClient> _clients = new();
private readonly List<Frame> _frames = new();
private readonly Stopwatch _watch = new();

public NatsProxy(int port, ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper;
_tcpListener = new TcpListener(IPAddress.Loopback, 0);
_tcpListener.Start();
_watch.Restart();

Task.Run(() =>
{
var client = 0;
while (true)
{
var tcpClient1 = _tcpListener.AcceptTcpClient();
tcpClient1.NoDelay = true;
tcpClient1.ReceiveBufferSize = 0;
tcpClient1.SendBufferSize = 0;
TcpClient tcpClient1 = _tcpListener.AcceptTcpClient();
TcpClient tcpClient2;
lock (_clients)
{
tcpClient1.NoDelay = true;
tcpClient1.ReceiveBufferSize = 0;
tcpClient1.SendBufferSize = 0;
_clients.Add(tcpClient1);
tcpClient2 = new TcpClient("127.0.0.1", port);
tcpClient2.NoDelay = true;
tcpClient2.ReceiveBufferSize = 0;
tcpClient2.SendBufferSize = 0;
_clients.Add(tcpClient2);
}
var n = client++;
var tcpClient2 = new TcpClient("127.0.0.1", port);
tcpClient2.NoDelay = true;
tcpClient2.ReceiveBufferSize = 0;
tcpClient2.SendBufferSize = 0;
#pragma warning disable CS4014
Task.Run(() =>
{
Expand Down Expand Up @@ -366,6 +375,17 @@ public NatsProxy(int port, ITestOutputHelper outputHelper)

public int Port => ((IPEndPoint)_tcpListener.Server.LocalEndPoint!).Port;

public IReadOnlyList<Frame> AllFrames
{
get
{
lock (_frames)
{
return _frames.ToList();
}
}
}

public IReadOnlyList<Frame> Frames
{
get
Expand All @@ -383,6 +403,33 @@ public IReadOnlyList<Frame> Frames

public IReadOnlyList<Frame> ServerFrames => Frames.Where(f => f.Origin == "S").ToList();

public void Reset()
{
lock (_clients)
{
foreach (var tcpClient in _clients)
{
try
{
tcpClient.Close();
}
catch
{
// ignore
}
}

ClearFrames();

_watch.Restart();
}
}

public void ClearFrames()
{
lock (_frames) _frames.Clear();
}

public void Dispose() => _tcpListener.Server.Dispose();

private bool NatsProtoDump(int client, string origin, TextReader sr, TextWriter sw)
Expand All @@ -402,7 +449,7 @@ private bool NatsProtoDump(int client, string origin, TextReader sr, TextWriter
if (Regex.IsMatch(message, @"^(INFO|CONNECT|PING|PONG|UNSUB|SUB|\+OK|-ERR)"))
{
if (client > 0)
AddFrame(new Frame(client, origin, message));
AddFrame(new Frame(_watch.Elapsed, client, origin, message));

sw.WriteLine(message);
sw.Flush();
Expand Down Expand Up @@ -448,13 +495,13 @@ private bool NatsProtoDump(int client, string origin, TextReader sr, TextWriter
sw.Flush();

if (client > 0)
AddFrame(new Frame(client, origin, Message: $"{message}␍␊{sb}"));
AddFrame(new Frame(_watch.Elapsed, client, origin, Message: $"{message}␍␊{sb}"));

return true;
}

if (client > 0)
AddFrame(new Frame(client, Origin: "ERROR", Message: $"Unknown protocol: {message}"));
AddFrame(new Frame(_watch.Elapsed, client, Origin: "ERROR", Message: $"Unknown protocol: {message}"));

return false;
}
Expand All @@ -467,7 +514,7 @@ private void AddFrame(Frame frame)

private void Log(string text) => _outputHelper.WriteLine($"{DateTime.Now:HH:mm:ss.fff} [PROXY] {text}");

public record Frame(int Client, string Origin, string Message);
public record Frame(TimeSpan Timestamp, int Client, string Origin, string Message);
}

public class NullOutputHelper : ITestOutputHelper
Expand Down

0 comments on commit b4feb50

Please sign in to comment.