From b4feb50263428f521634e71c0ee9cb3840066692 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 30 Jun 2023 13:13:59 +0100 Subject: [PATCH] Auto-unsubscribe reconnect test --- tests/NATS.Client.Core.Tests/ProtocolTest.cs | 71 +++++++++++++++---- tests/NATS.Client.Core.Tests/_NatsServer.cs | 73 ++++++++++++++++---- 2 files changed, 119 insertions(+), 25 deletions(-) diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index 43f35769e..4b3fdf5d0 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -1,5 +1,3 @@ -using System.Text.RegularExpressions; - namespace NATS.Client.Core.Tests; public class ProtocolTest @@ -169,7 +167,7 @@ public async Task Unsubscribe_max_msgs() var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; await using var sub = await nats.SubscribeAsync("foo", opts); - var sid = ((INatsSub) sub).Sid; + var sid = ((INatsSub)sub).Sid; await Retry.Until("all frames arrived", () => proxy.Frames.Count >= 2); Assert.Equal($"SUB foo {sid}", proxy.Frames[0].Message); Assert.Equal($"UNSUB {sid} {maxMsgs}", proxy.Frames[1].Message); @@ -195,22 +193,18 @@ public async Task Unsubscribe_max_msgs() // Manual unsubscribe { + proxy.ClearFrames(); + await using var sub = await nats.SubscribeAsync("foo2"); await sub.UnsubscribeAsync(); var sid = ((INatsSub)sub).Sid; - await Retry.Until("all frames arrived", () => proxy.Frames.Count(f => f.Message == $"UNSUB {sid}") == 1); + await Retry.Until("all frames arrived", () => proxy.ClientFrames.Count >= 2); - // Frames we're interested in would be somewhere down the list - // since we ran other tests above. - var index = proxy.ClientFrames - .Select((f, i) => (frame: f, Index: i)) - .Single(fi => fi.frame.Message == $"SUB foo2 {sid}") - .Index; - Assert.Matches($"SUB foo2 {sid}", proxy.ClientFrames[index].Message); - Assert.Matches($"UNSUB {sid}", proxy.ClientFrames[index + 1].Message); + Assert.Equal($"SUB foo2 {sid}", proxy.ClientFrames[0].Message); + Assert.Equal($"UNSUB {sid}", proxy.ClientFrames[1].Message); // send messages to check we receive none since we're already unsubscribed for (var i = 0; i < 100; i++) @@ -231,5 +225,58 @@ public async Task Unsubscribe_max_msgs() Assert.Equal(0, count); Assert.Equal(NatsSubEndReason.None, sub.EndReason); } + + // Reconnect + { + proxy.Reset(); + + const int maxMsgs = 100; + const int pubMsgs = 10; + var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; + var sub = await nats.SubscribeAsync("foo3", opts); + var count = 0; + var reg = sub.Register(_ => Interlocked.Increment(ref count)); + var sid = ((INatsSub)sub).Sid; + await Retry.Until("subscribed", () => proxy.Frames.Any(f => f.Message == $"SUB foo3 {sid}")); + + for (var i = 0; i < pubMsgs; i++) + { + await nats.PublishAsync("foo3", i); + } + + await Retry.Until("published", () => proxy.Frames.Count(f => f.Message.StartsWith("PUB foo3")) == 10); + await Retry.Until("received", () => Volatile.Read(ref count) == 10); + + var pending = maxMsgs - pubMsgs; + Assert.Equal(pending, ((INatsSub)sub).PendingMsgs); + + proxy.Reset(); + + // SUB + UNSUB + await Retry.Until("re-subscribed", () => proxy.ClientFrames.Count == 2); + + // Make sure we're still using the same SID + Assert.Equal($"SUB foo3 {sid}", proxy.ClientFrames[0].Message); + Assert.Equal($"UNSUB {sid} {pending}", proxy.ClientFrames[1].Message); + + // We already published a few, this should exceed max-msgs + for (var i = 0; i < maxMsgs; i++) + { + await nats.PublishAsync("foo3", i); + } + + await Retry.Until( + "published more", + () => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB foo3")) == maxMsgs); + + await Retry.Until( + "unsubscribed with max-msgs", + () => sub.EndReason == NatsSubEndReason.MaxMsgs); + + Assert.Equal(Volatile.Read(ref count), maxMsgs); + + await sub.DisposeAsync(); + await reg; + } } } diff --git a/tests/NATS.Client.Core.Tests/_NatsServer.cs b/tests/NATS.Client.Core.Tests/_NatsServer.cs index 64c2f9caa..db464ad4b 100644 --- a/tests/NATS.Client.Core.Tests/_NatsServer.cs +++ b/tests/NATS.Client.Core.Tests/_NatsServer.cs @@ -296,31 +296,40 @@ public class NatsProxy : IDisposable { private readonly ITestOutputHelper _outputHelper; private readonly TcpListener _tcpListener; + private readonly List _clients = new(); private readonly List _frames = new(); + private readonly Stopwatch _watch = new(); public NatsProxy(int port, ITestOutputHelper outputHelper) { _outputHelper = outputHelper; _tcpListener = new TcpListener(IPAddress.Loopback, 0); _tcpListener.Start(); + _watch.Restart(); Task.Run(() => { var client = 0; while (true) { - var tcpClient1 = _tcpListener.AcceptTcpClient(); - tcpClient1.NoDelay = true; - tcpClient1.ReceiveBufferSize = 0; - tcpClient1.SendBufferSize = 0; + TcpClient tcpClient1 = _tcpListener.AcceptTcpClient(); + TcpClient tcpClient2; + lock (_clients) + { + tcpClient1.NoDelay = true; + tcpClient1.ReceiveBufferSize = 0; + tcpClient1.SendBufferSize = 0; + _clients.Add(tcpClient1); + + tcpClient2 = new TcpClient("127.0.0.1", port); + tcpClient2.NoDelay = true; + tcpClient2.ReceiveBufferSize = 0; + tcpClient2.SendBufferSize = 0; + _clients.Add(tcpClient2); + } var n = client++; - var tcpClient2 = new TcpClient("127.0.0.1", port); - tcpClient2.NoDelay = true; - tcpClient2.ReceiveBufferSize = 0; - tcpClient2.SendBufferSize = 0; - #pragma warning disable CS4014 Task.Run(() => { @@ -366,6 +375,17 @@ public NatsProxy(int port, ITestOutputHelper outputHelper) public int Port => ((IPEndPoint)_tcpListener.Server.LocalEndPoint!).Port; + public IReadOnlyList AllFrames + { + get + { + lock (_frames) + { + return _frames.ToList(); + } + } + } + public IReadOnlyList Frames { get @@ -383,6 +403,33 @@ public IReadOnlyList Frames public IReadOnlyList ServerFrames => Frames.Where(f => f.Origin == "S").ToList(); + public void Reset() + { + lock (_clients) + { + foreach (var tcpClient in _clients) + { + try + { + tcpClient.Close(); + } + catch + { + // ignore + } + } + + ClearFrames(); + + _watch.Restart(); + } + } + + public void ClearFrames() + { + lock (_frames) _frames.Clear(); + } + public void Dispose() => _tcpListener.Server.Dispose(); private bool NatsProtoDump(int client, string origin, TextReader sr, TextWriter sw) @@ -402,7 +449,7 @@ private bool NatsProtoDump(int client, string origin, TextReader sr, TextWriter if (Regex.IsMatch(message, @"^(INFO|CONNECT|PING|PONG|UNSUB|SUB|\+OK|-ERR)")) { if (client > 0) - AddFrame(new Frame(client, origin, message)); + AddFrame(new Frame(_watch.Elapsed, client, origin, message)); sw.WriteLine(message); sw.Flush(); @@ -448,13 +495,13 @@ private bool NatsProtoDump(int client, string origin, TextReader sr, TextWriter sw.Flush(); if (client > 0) - AddFrame(new Frame(client, origin, Message: $"{message}␍␊{sb}")); + AddFrame(new Frame(_watch.Elapsed, client, origin, Message: $"{message}␍␊{sb}")); return true; } if (client > 0) - AddFrame(new Frame(client, Origin: "ERROR", Message: $"Unknown protocol: {message}")); + AddFrame(new Frame(_watch.Elapsed, client, Origin: "ERROR", Message: $"Unknown protocol: {message}")); return false; } @@ -467,7 +514,7 @@ private void AddFrame(Frame frame) private void Log(string text) => _outputHelper.WriteLine($"{DateTime.Now:HH:mm:ss.fff} [PROXY] {text}"); - public record Frame(int Client, string Origin, string Message); + public record Frame(TimeSpan Timestamp, int Client, string Origin, string Message); } public class NullOutputHelper : ITestOutputHelper