From 84b5193a0799ad3c6150785850a8ff0e66f97d65 Mon Sep 17 00:00:00 2001 From: Daniel Wertheim Date: Mon, 2 Dec 2019 17:35:33 +0100 Subject: [PATCH] Mostly tests work (#348) * Ensures connection and subscriptions in tests are disposed Some cleaning as well * Stops using latest lang version for now * Certificate has been moved * Fixes shut down of test server * Adds NatsTestCase allowing for automatic retries Currently only for NATSNoServersException * Adds missing header * Uses NatsFact for integration tests * Adds deadlock tests for unsub * Fixes NATS-Server CreateFastAndVerify * Fixes name of argument in exception * Reverts custom fact Changes to VSTest and enables re-run of integration tests. --- NATS.Client.snk | Bin 596 -> 0 bytes az-templates/stage-build.yml | 48 +- azure-pipelines-ci.yml | 1 - src/Directory.Build.props | 1 - src/NATS.Client/IEncodedConnection.cs | 2 +- src/NATS.Client/Rx/Ops/WhereObservable.cs | 2 +- .../IntegrationTests/IntegrationTests.csproj | 2 +- .../TestAsyncAwaitDeadlocks.cs | 156 +++- .../IntegrationTests/TestAuthorization.cs | 85 +- src/Tests/IntegrationTests/TestBasic.cs | 417 +++++----- src/Tests/IntegrationTests/TestCluster.cs | 48 +- src/Tests/IntegrationTests/TestConnection.cs | 736 ++++++++++-------- src/Tests/IntegrationTests/TestEncoding.cs | 94 ++- src/Tests/IntegrationTests/TestReconnect.cs | 400 +++++----- .../IntegrationTests/TestSubscriptions.cs | 490 ++++++------ src/Tests/IntegrationTests/TestTLS.cs | 51 +- src/Tests/IntegrationTests/TestUtilities.cs | 21 +- src/Tests/UnitTests/UnitTests.csproj | 2 +- 18 files changed, 1429 insertions(+), 1127 deletions(-) delete mode 100644 NATS.Client.snk diff --git a/NATS.Client.snk b/NATS.Client.snk deleted file mode 100644 index 32d3d3ac9411a7f1aeb62c6636bb6dadec030992..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 596 zcmV-a0;~N80ssI2Bme+XQ$aES1ONa50098oeWCLBkcl#TN78L+W^b(^h94y9tlJZo zU$(|g{^;6Rn@L^sdUfeE-MY&MQlup`Qoi8cKKd()l4Jq@CH7# z@3H6DB<_l7TV#9@Jd%S?1f!@wwhu#C6y5d2<@Rb8fjPv%v84(=V8LY3fZ}Db@BqzG z|67NTTQ`OTpSOdOHaJ*ZH0Xyq3qJ3_%*F(tJQh>K-lTqV}$wz`MACB`n*2X>OM^* z|1OkApYY#2jGB?fK35=;#&dQ@-`Js&53={3;1~y7id%nY8V@DcNGgMzC%%IKyxe2 zRt7r{jdisdzWn6R`Y7VW0|UY}B*b9ThWiTd6T!JtKus^qx3yR%Mh2sND;EXlISjEK z;!)lX&UK@q#*=1hgbPK&xrHdEmag3`JxbDI&AQB)lId<#HBv~Ml3oP#xHKP?ZC@uq zI1!t(oI+Az22+&O?lcK0t_KV+M|$wn5b-?C`{J;8T)@2wjuM`*c%_0v$N}cb1iyI| i4qvqf#>u5imaw-MgMz01-1wUOKk9MJopns17^}63S|9cR diff --git a/az-templates/stage-build.yml b/az-templates/stage-build.yml index e94403516..ed36a68bc 100644 --- a/az-templates/stage-build.yml +++ b/az-templates/stage-build.yml @@ -13,20 +13,20 @@ stages: projects: 'src/*.sln' arguments: '-c $(BuildConfiguration) --no-incremental --nologo -p:TreatWarningsAsErrors=true -p:Version=$(SemVer) -p:InformationalVersion=$(InfoVer)' - - task: DotNetCoreCLI@2 + - task: VSTest@2 displayName: 'UnitTests .NetCoreApp2.2' inputs: - command: test - projects: 'src/Tests/**/UnitTests.csproj' - arguments: '-c $(BuildConfiguration) -f netcoreapp2.2 --no-build' + testSelector: 'testAssemblies' + testAssemblyVer2: 'src/Tests/UnitTests/bin/$(BuildConfiguration)/netcoreapp2.2/UnitTests.dll' + configuration: $(BuildConfiguration) testRunTitle: 'UnitTests .NetCoreApp2.2' - - task: DotNetCoreCLI@2 + - task: VSTest@2 displayName: 'UnitTests .Net4.5.2' inputs: - command: test - projects: 'src/Tests/**/UnitTests.csproj' - arguments: '-c $(BuildConfiguration) -f net452 --no-build' + testSelector: 'testAssemblies' + testAssemblyVer2: 'src/Tests/UnitTests/bin/$(BuildConfiguration)/net452/UnitTests.dll' + configuration: $(BuildConfiguration) testRunTitle: 'UnitTests .Net4.5.2' - task: PowerShell@2 @@ -62,29 +62,35 @@ stages: $natsServerDir = Get-ChildItem -Directory -Path "$(Agent.TempDirectory)\nats-server\nats-server-*" | Select -Expand FullName Write-Host "Found nats-server path: $natsServerDir" Write-Host "##vso[task.setvariable variable=PATH;]${env:PATH};$natsServerDir" - - - task: DotNetCoreCLI@2 + + - task: VSTest@2 displayName: 'IntegrationTests .NetCoreApp2.2' inputs: - command: test - projects: 'src/Tests/**/IntegrationTests.csproj' - arguments: '-c $(BuildConfiguration) -f netcoreapp2.2 --no-build' + testSelector: 'testAssemblies' + testAssemblyVer2: 'src/Tests/IntegrationTests/bin/$(BuildConfiguration)/netcoreapp2.2/IntegrationTests.dll' + configuration: $(BuildConfiguration) + rerunFailedTests: True + rerunMaxAttempts: 2 testRunTitle: 'IntegrationTests .NetCoreApp2.2' - - task: DotNetCoreCLI@2 + - task: VSTest@2 displayName: 'IntegrationTests .NetCoreApp3.0' inputs: - command: test - projects: 'src/Tests/**/IntegrationTests.csproj' - arguments: '-c $(BuildConfiguration) -f netcoreapp3.0 --no-build' + testSelector: 'testAssemblies' + testAssemblyVer2: 'src/Tests/IntegrationTests/bin/$(BuildConfiguration)/netcoreapp3.0/IntegrationTests.dll' + configuration: $(BuildConfiguration) + rerunFailedTests: True + rerunMaxAttempts: 2 testRunTitle: 'IntegrationTests .NetCoreApp3.0' - - task: DotNetCoreCLI@2 + - task: VSTest@2 displayName: 'IntegrationTests .Net4.5.2' inputs: - command: test - projects: 'src/Tests/**/IntegrationTests.csproj' - arguments: '-c $(BuildConfiguration) -f net452 --no-build' + testSelector: 'testAssemblies' + testAssemblyVer2: 'src/Tests/IntegrationTests/bin/$(BuildConfiguration)/net452/IntegrationTests.dll' + configuration: $(BuildConfiguration) + rerunFailedTests: True + rerunMaxAttempts: 2 testRunTitle: 'IntegrationTests .Net4.5.2' - task: DotNetCoreCLI@2 diff --git a/azure-pipelines-ci.yml b/azure-pipelines-ci.yml index 41a87e086..a18d6cada 100644 --- a/azure-pipelines-ci.yml +++ b/azure-pipelines-ci.yml @@ -18,7 +18,6 @@ pr: - master pool: - name: Azure Pipelines vmImage: windows-2019 stages: diff --git a/src/Directory.Build.props b/src/Directory.Build.props index ba87d9f28..8db13ae57 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -19,6 +19,5 @@ Copyright © The NATS Authors 2016-2019 https://github.com/nats-io/nats.net Git - latest \ No newline at end of file diff --git a/src/NATS.Client/IEncodedConnection.cs b/src/NATS.Client/IEncodedConnection.cs index 75758d73b..12cdede44 100644 --- a/src/NATS.Client/IEncodedConnection.cs +++ b/src/NATS.Client/IEncodedConnection.cs @@ -179,7 +179,7 @@ public interface IEncodedConnection : IDisposable /// The is closed. /// There was an unexpected exception performing an internal NATS call while executing the /// request. See for more details. - public void FlushBuffer(); + void FlushBuffer(); /// /// Closes the and all associated diff --git a/src/NATS.Client/Rx/Ops/WhereObservable.cs b/src/NATS.Client/Rx/Ops/WhereObservable.cs index e0663291a..46896786d 100644 --- a/src/NATS.Client/Rx/Ops/WhereObservable.cs +++ b/src/NATS.Client/Rx/Ops/WhereObservable.cs @@ -39,7 +39,7 @@ private sealed class WhereObserver : IObserver public WhereObserver(IObserver observer, Func predicate) { this.observer = observer ?? throw new ArgumentNullException(nameof(observer)); - this.predicate = predicate ?? throw new ArgumentNullException(nameof(observer)); + this.predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); } public void OnNext(T value) diff --git a/src/Tests/IntegrationTests/IntegrationTests.csproj b/src/Tests/IntegrationTests/IntegrationTests.csproj index 0b3c8c4eb..bf9d0be0c 100644 --- a/src/Tests/IntegrationTests/IntegrationTests.csproj +++ b/src/Tests/IntegrationTests/IntegrationTests.csproj @@ -9,7 +9,7 @@ - + all diff --git a/src/Tests/IntegrationTests/TestAsyncAwaitDeadlocks.cs b/src/Tests/IntegrationTests/TestAsyncAwaitDeadlocks.cs index 9564d73ee..ab7227004 100644 --- a/src/Tests/IntegrationTests/TestAsyncAwaitDeadlocks.cs +++ b/src/Tests/IntegrationTests/TestAsyncAwaitDeadlocks.cs @@ -1,5 +1,19 @@ -#if !NET452 +// Copyright 2015-2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#if !NET452 +using System; using System.Collections.Concurrent; using System.Threading.Tasks; using NATS.Client; @@ -272,9 +286,8 @@ public void EnsurePubSubWithAsyncHandler() { using (var cn = Context.ConnectionFactory.CreateConnection(Context.Server1.Url)) { - using (cn.SubscribeAsync(subject, async (_, m) => + using (cn.SubscribeAsync(subject, (_, m) => { - await Task.Delay(100); recieved.Enqueue(m.Message); if(recieved.Count == 5) sync.SignalComplete(); @@ -292,6 +305,143 @@ public void EnsurePubSubWithAsyncHandler() } }); } + + [Fact] + public void EnsureAutoUnsubscribeForSyncSub() + { + var subject = "de82267b22454dd7afc37c9e34a8e0ab"; + var recieved = new ConcurrentQueue(); + + AsyncContext.Run(() => + { + using (NATSServer.CreateFast(Context.Server1.Port)) + { + using (var cn = Context.ConnectionFactory.CreateConnection(Context.Server1.Url)) + { + using (var sub = cn.SubscribeSync(subject)) + { + sub.AutoUnsubscribe(1); + + cn.Publish(subject, new byte[0]); + + recieved.Enqueue(sub.NextMessage()); + + cn.Publish(subject, new byte[0]); + + Assert.Equal(1, sub.Delivered); + } + } + + Assert.Single(recieved); + } + }); + } + + [Fact] + public void EnsureAutoUnsubscribeForAsyncSub() + { + var subject = "0be903f6c9c14c10973e78ce03ad47e1"; + var recieved = new ConcurrentQueue(); + + AsyncContext.Run(async () => + { + using (NATSServer.CreateFast(Context.Server1.Port)) + { + using (var sync = TestSync.SingleActor()) + { + using (var cn = Context.ConnectionFactory.CreateConnection(Context.Server1.Url)) + { + using (var sub = cn.SubscribeAsync(subject, (_, m) => + { + recieved.Enqueue(m.Message); + sync.SignalComplete(); + })) + { + sub.AutoUnsubscribe(1); + + cn.Publish(subject, new byte[0]); + cn.Publish(subject, new byte[0]); + + sync.WaitForAll(); + + await Task.Delay(100); + Assert.Equal(1, sub.Delivered); + } + } + + Assert.Single(recieved); + } + } + }); + } + + [Fact] + public void EnsureUnsubscribeForSyncSub() + { + var subject = "b2dcc4f56fd041cb985300d4966bd1c1"; + var recieved = new ConcurrentQueue(); + + AsyncContext.Run(() => + { + using (NATSServer.CreateFast(Context.Server1.Port)) + { + using (var cn = Context.ConnectionFactory.CreateConnection(Context.Server1.Url)) + { + using (var sub = cn.SubscribeSync(subject)) + { + cn.Publish(subject, new byte[0]); + + recieved.Enqueue(sub.NextMessage()); + + sub.Unsubscribe(); + + cn.Publish(subject, new byte[0]); + Assert.Throws(sub.NextMessage); + Assert.Equal(1, sub.Delivered); + } + } + + Assert.Single(recieved); + } + }); + } + + [Fact] + public void EnsureUnsubscribeForAsyncSub() + { + var subject = "d37e3729c5c84702b836a4bb4edf7241"; + var recieved = new ConcurrentQueue(); + + AsyncContext.Run(async () => + { + using (NATSServer.CreateFast(Context.Server1.Port)) + { + using (var sync = TestSync.SingleActor()) + { + using (var cn = Context.ConnectionFactory.CreateConnection(Context.Server1.Url)) + { + using (var sub = cn.SubscribeAsync(subject, (_, m) => + { + recieved.Enqueue(m.Message); + sync.SignalComplete(); + })) + { + cn.Publish(subject, new byte[0]); + sync.WaitForAll(); + + sub.Unsubscribe(); + + cn.Publish(subject, new byte[0]); + await Task.Delay(100); + Assert.Equal(1, sub.Delivered); + } + } + + Assert.Single(recieved); + } + } + }); + } } } #endif \ No newline at end of file diff --git a/src/Tests/IntegrationTests/TestAuthorization.cs b/src/Tests/IntegrationTests/TestAuthorization.cs index 6412ef1b0..60bd418b5 100644 --- a/src/Tests/IntegrationTests/TestAuthorization.cs +++ b/src/Tests/IntegrationTests/TestAuthorization.cs @@ -15,7 +15,6 @@ using NATS.Client; using System.Threading; using System.Reflection; -using System.IO; using System.Linq; using Xunit; @@ -38,10 +37,12 @@ private void connectAndFail(string url) Options opts = Context.GetTestOptions(); opts.Url = url; opts.DisconnectedEventHandler += handleDisconnect; - IConnection c = Context.ConnectionFactory.CreateConnection(opts); - Assert.True(false, "Expected a failure; did not receive one"); - - c.Close(); + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + c.Close(); + + Assert.True(false, "Expected a failure; did not receive one"); + } } catch (Exception e) { @@ -49,7 +50,7 @@ private void connectAndFail(string url) } finally { - Assert.False(hitDisconnect > 0, "The disconnect event handler was incorrectly invoked."); + Assert.False(hitDisconnect > 0, "hitDisconnect > 0: The disconnect event handler was incorrectly invoked."); } } @@ -61,17 +62,17 @@ private void handleDisconnect(object sender, ConnEventArgs e) [Fact] public void TestAuthSuccess() { - using (NATSServer s = NATSServer.CreateWithConfig(Context.Server1.Port, "auth.conf")) + using (NATSServer.CreateWithConfig(Context.Server1.Port, "auth.conf")) { - IConnection c = Context.ConnectionFactory.CreateConnection($"nats://username:password@localhost:{Context.Server1.Port}"); - c.Close(); + using(var c = Context.ConnectionFactory.CreateConnection($"nats://username:password@localhost:{Context.Server1.Port}")) + c.Close(); } } [Fact] public void TestAuthFailure() { - using (NATSServer s = NATSServer.CreateWithConfig(Context.Server1.Port, "auth.conf")) + using (NATSServer.CreateWithConfig(Context.Server1.Port, "auth.conf")) { connectAndFail($"nats://username@localhost:{Context.Server1.Port}"); connectAndFail($"nats://username:badpass@localhost:{Context.Server1.Port}"); @@ -83,7 +84,7 @@ public void TestAuthFailure() [Fact] public void TestAuthToken() { - using (NATSServer s = NATSServer.Create(Context.Server1.Port, "-auth S3Cr3T0k3n!")) + using (NATSServer.Create(Context.Server1.Port, "-auth S3Cr3T0k3n!")) { connectAndFail(Context.Server1.Url); connectAndFail($"nats://invalid_token@localhost:{Context.Server1.Port}"); @@ -99,8 +100,8 @@ public void TestReconnectAuthTimeout() AutoResetEvent ev = new AutoResetEvent(false); using (NATSServer s1 = NATSServer.CreateWithConfig(Context.Server1.Port, "auth.conf"), - s2 = NATSServer.CreateWithConfig(Context.Server2.Port, "auth_timeout.conf"), - s3 = NATSServer.CreateWithConfig(Context.Server3.Port, "auth.conf")) + _ = NATSServer.CreateWithConfig(Context.Server2.Port, "auth_timeout.conf"), + __ = NATSServer.CreateWithConfig(Context.Server3.Port, "auth.conf")) { Options opts = Context.GetTestOptions(); @@ -116,14 +117,15 @@ public void TestReconnectAuthTimeout() ev.Set(); }; - IConnection c = Context.ConnectionFactory.CreateConnection(opts); - - s1.Shutdown(); + using (Context.ConnectionFactory.CreateConnection(opts)) + { + s1.Shutdown(); - // This should fail over to S2 where an authorization timeout occurs - // then successfully reconnect to S3. + // This should fail over to S2 where an authorization timeout occurs + // then successfully reconnect to S3. - Assert.True(ev.WaitOne(20000)); + Assert.True(ev.WaitOne(20000)); + } } } @@ -143,7 +145,7 @@ public void TestCallbackIsPerformedOnAuthFailure() { var ex = Assert.Throws(() => { - using (var cn = Context.ConnectionFactory.CreateConnection(opts)) { } + using (Context.ConnectionFactory.CreateConnection(opts)) { } }); Assert.Equal("'Authorization Violation'", ex.Message, StringComparer.OrdinalIgnoreCase); } @@ -196,12 +198,12 @@ public void TestReconnectAuthTimeoutLateClose() AutoResetEvent ev = new AutoResetEvent(false); using (NATSServer s1 = NATSServer.CreateWithConfig(Context.Server1.Port, "auth.conf"), - s2 = NATSServer.CreateWithConfig(Context.Server3.Port, "auth.conf")) + _ = NATSServer.CreateWithConfig(Context.Server3.Port, "auth.conf")) { Options opts = Context.GetTestOptions(); - opts.Servers = new string[]{ + opts.Servers = new [] { $"nats://username:password@localhost:{Context.Server1.Port}", $"nats://username:password@localhost:{Context.Server3.Port}" }; opts.NoRandomize = true; @@ -211,30 +213,33 @@ public void TestReconnectAuthTimeoutLateClose() ev.Set(); }; - IConnection c = Context.ConnectionFactory.CreateConnection(opts); + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + // inject an authorization timeout, as if it were processed by an incoming server message. + // this is done at the parser level so that parsing is also tested, + // therefore it needs reflection since Parser is an internal type. + Type parserType = typeof(Connection).Assembly.GetType("NATS.Client.Parser"); + Assert.NotNull(parserType); + + BindingFlags flags = BindingFlags.NonPublic | BindingFlags.Instance; + object parser = Activator.CreateInstance(parserType, flags, null, new object[] {c}, null); + Assert.NotNull(parser); - // inject an authorization timeout, as if it were processed by an incoming server message. - // this is done at the parser level so that parsing is also tested, - // therefore it needs reflection since Parser is an internal type. - Type parserType = typeof(Connection).Assembly.GetType("NATS.Client.Parser"); - Assert.NotNull(parserType); - BindingFlags flags = BindingFlags.NonPublic | BindingFlags.Instance; - object parser = Activator.CreateInstance(parserType, flags, null, new object[] { c }, null); - Assert.NotNull(parser); + MethodInfo parseMethod = parserType.GetMethod("parse", flags); + Assert.NotNull(parseMethod); - MethodInfo parseMethod = parserType.GetMethod("parse", flags); - Assert.NotNull(parseMethod); + byte[] bytes = "-ERR 'Authorization Timeout'\r\n".ToCharArray().Select(ch => (byte) ch).ToArray(); + parseMethod.Invoke(parser, new object[] {bytes, bytes.Length}); - byte[] bytes = "-ERR 'Authorization Timeout'\r\n".ToCharArray().Select(ch => (byte)ch).ToArray(); - parseMethod.Invoke(parser, new object[] { bytes, bytes.Length }); + // sleep to allow the client to process the error, then shutdown the server. + Thread.Sleep(250); - // sleep to allow the client to process the error, then shutdown the server. - Thread.Sleep(250); - s1.Shutdown(); + s1.Shutdown(); - // Wait for a reconnect. - Assert.True(ev.WaitOne(20000)); + // Wait for a reconnect. + Assert.True(ev.WaitOne(20000)); + } } } #endif diff --git a/src/Tests/IntegrationTests/TestBasic.cs b/src/Tests/IntegrationTests/TestBasic.cs index 3ee132a24..2f436dc67 100644 --- a/src/Tests/IntegrationTests/TestBasic.cs +++ b/src/Tests/IntegrationTests/TestBasic.cs @@ -36,18 +36,19 @@ public void TestConnectedServer() { using (NATSServer.CreateFastAndVerify()) { - IConnection c = Context.OpenConnection(); - - string u = c.ConnectedUrl; + using (var c = Context.OpenConnection()) + { + string u = c.ConnectedUrl; - Assert.False(string.IsNullOrWhiteSpace(u), string.Format("Invalid connected url {0}.", u)); + Assert.False(string.IsNullOrWhiteSpace(u), string.Format("Invalid connected url {0}.", u)); - Assert.Equal(Defaults.Url, u); + Assert.Equal(Defaults.Url, u); - c.Close(); - u = c.ConnectedUrl; + c.Close(); + u = c.ConnectedUrl; - Assert.Null(u); + Assert.Null(u); + } } } @@ -56,17 +57,17 @@ public void TestMultipleClose() { using (NATSServer.CreateFastAndVerify()) { - IConnection c = Context.OpenConnection(); - - Task[] tasks = new Task[10]; - - for (int i = 0; i < 10; i++) + using (var c = Context.OpenConnection()) { + Task[] tasks = new Task[10]; - tasks[i] = Task.Run(() => c.Close()); - } + for (int i = 0; i < 10; i++) + { + tasks[i] = Task.Run(() => c.Close()); + } - Task.WaitAll(tasks); + Task.WaitAll(tasks); + } } } @@ -303,31 +304,40 @@ public void TestFlush() var opts = Context.GetTestOptions(); opts.AllowReconnect = false; - var c = Context.ConnectionFactory.CreateConnection(opts); - - using (ISyncSubscription s = c.SubscribeSync("foo")) + using (var c = Context.ConnectionFactory.CreateConnection(opts)) { - c.Publish("foo", "reply", omsg); - c.Flush(); - } + using (ISyncSubscription s = c.SubscribeSync("foo")) + { + c.Publish("foo", "reply", omsg); + c.Flush(); + } - // Test a timeout, locally this may actually succeed, - // so allow for that. - // TODO: find a way to debug/pause the server to allow - // for timeouts. - try { c.Flush(1); } catch (NATSTimeoutException) {} + // Test a timeout, locally this may actually succeed, + // so allow for that. + // TODO: find a way to debug/pause the server to allow + // for timeouts. + try + { + c.Flush(1); + } + catch (NATSTimeoutException) + { + } - Assert.Throws(() => { c.Flush(-1); }); + Assert.Throws(() => { c.Flush(-1); }); - // test a closed connection - c.Close(); - Assert.Throws(() => { c.Flush(); }); + // test a closed connection + c.Close(); + Assert.Throws(() => { c.Flush(); }); + } // test a lost connection - c = Context.ConnectionFactory.CreateConnection(opts); - server.Shutdown(); - Thread.Sleep(500); - Assert.Throws(() => { c.Flush(); }); + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + server.Shutdown(); + Thread.Sleep(500); + Assert.Throws(() => { c.Flush(); }); + } } } @@ -798,36 +808,35 @@ private async void testRequestAsyncTimeout(bool useOldRequestStyle) var opts = Context.GetTestOptionsWithDefaultTimeout(); opts.AllowReconnect = false; opts.UseOldRequestStyle = useOldRequestStyle; - var conn = Context.ConnectionFactory.CreateConnection(opts); - - // success condition - var sub = conn.SubscribeAsync("foo", (obj, args) => { - conn.Publish(args.Message.Reply, new byte[0]); - }); - - sw.Start(); - await conn.RequestAsync("foo", new byte[0], 5000); - sw.Stop(); - Assert.True(sw.ElapsedMilliseconds < 5000, "Unexpected timeout behavior"); - sub.Unsubscribe(); - - // valid connection, but no response - sw.Restart(); - await Assert.ThrowsAsync(() => { return conn.RequestAsync("test", new byte[0], 500); }); - sw.Stop(); - long elapsed = sw.ElapsedMilliseconds; - Assert.True(elapsed >= 500, string.Format("Unexpected value (should be > 500): {0}", elapsed)); - long variance = elapsed - 500; + using (var conn = Context.ConnectionFactory.CreateConnection(opts)) + { + // success condition + using (var sub = conn.SubscribeAsync("foo", (obj, args) => { conn.Publish(args.Message.Reply, new byte[0]); })) + { + sw.Start(); + await conn.RequestAsync("foo", new byte[0], 5000); + sw.Stop(); + Assert.True(sw.ElapsedMilliseconds < 5000, "Unexpected timeout behavior"); + sub.Unsubscribe(); + } + + // valid connection, but no response + sw.Restart(); + await Assert.ThrowsAsync(() => { return conn.RequestAsync("test", new byte[0], 500); }); + sw.Stop(); + long elapsed = sw.ElapsedMilliseconds; + Assert.True(elapsed >= 500, string.Format("Unexpected value (should be > 500): {0}", elapsed)); + long variance = elapsed - 500; #if DEBUG - Assert.True(variance < 250, string.Format("Invalid timeout variance: {0}", variance)); + Assert.True(variance < 250, string.Format("Invalid timeout variance: {0}", variance)); #else Assert.True(variance < 100, string.Format("Invalid timeout variance: {0}", variance)); #endif - - // Test an invalid connection - server.Shutdown(); - Thread.Sleep(500); - await Assert.ThrowsAsync(() => { return conn.RequestAsync("test", new byte[0], 1000); }); + // Test an invalid connection + server.Shutdown(); + Thread.Sleep(500); + await Assert.ThrowsAsync(() => { return conn.RequestAsync("test", new byte[0], 1000); }); + } } } @@ -1094,16 +1103,17 @@ public void TestReleaseFlush() { using (NATSServer.CreateFastAndVerify()) { - IConnection c = Context.OpenConnection(); - - for (int i = 0; i < 1000; i++) + using (var c = Context.OpenConnection()) { - c.Publish("foo", Encoding.UTF8.GetBytes("Hello")); - } + for (int i = 0; i < 1000; i++) + { + c.Publish("foo", Encoding.UTF8.GetBytes("Hello")); + } - c.Flush(); + c.Flush(); - Task.Run(() => c.Close()); + Task.Run(() => c.Close()).Wait(); + } } } @@ -1156,21 +1166,25 @@ public void TestStats() c.ResetStats(); // Test both sync and async versions of subscribe. - IAsyncSubscription s1 = c.SubscribeAsync("foo"); - s1.MessageHandler += (sender, arg) => { }; - s1.Start(); + using (var s1 = c.SubscribeAsync("foo")) + { + s1.MessageHandler += (sender, arg) => { }; + s1.Start(); - ISyncSubscription s2 = c.SubscribeSync("foo"); + using (c.SubscribeSync("foo")) + { + for (int i = 0; i < iter; i++) + { + c.Publish("foo", data); + } - for (int i = 0; i < iter; i++) - { - c.Publish("foo", data); - } - c.Flush(1000); + c.Flush(1000); - stats = c.Stats; - Assert.Equal(2 * iter, stats.InMsgs); - Assert.Equal(2 * iter * data.Length, stats.InBytes); + stats = c.Stats; + Assert.Equal(2 * iter, stats.InMsgs); + Assert.Equal(2 * iter * data.Length, stats.InBytes); + } + } } } } @@ -1383,22 +1397,28 @@ public void TestUrlArgument() using (NATSServer.CreateFastAndVerify()) { - IConnection c = Context.ConnectionFactory.CreateConnection(urls); - Assert.Equal(c.Opts.Servers[0],url1); - Assert.Equal(c.Opts.Servers[1],url2); - Assert.Equal(c.Opts.Servers[2],url3); + using (var c = Context.ConnectionFactory.CreateConnection(urls)) + { + Assert.Equal(c.Opts.Servers[0], url1); + Assert.Equal(c.Opts.Servers[1], url2); + Assert.Equal(c.Opts.Servers[2], url3); - c.Close(); + c.Close(); + } urls = url1 + " , " + url2 + "," + url3; - c = Context.ConnectionFactory.CreateConnection(urls); - Assert.Equal(c.Opts.Servers[0],url1); - Assert.Equal(c.Opts.Servers[1],url2); - Assert.Equal(c.Opts.Servers[2],url3); - c.Close(); - - c = Context.ConnectionFactory.CreateConnection(url1); - c.Close(); + using (var c = Context.ConnectionFactory.CreateConnection(urls)) + { + Assert.Equal(c.Opts.Servers[0], url1); + Assert.Equal(c.Opts.Servers[1], url2); + Assert.Equal(c.Opts.Servers[2], url3); + c.Close(); + } + + using(var c = Context.ConnectionFactory.CreateConnection(url1)) + { + c.Close(); + } } } @@ -1432,30 +1452,36 @@ public void TestAsyncInfoProtocolConnect() var opts = Context.GetTestOptions(Context.Server3.Port); opts.NoRandomize = false; - var c = Context.ConnectionFactory.CreateConnection(opts); - Assert.True(assureClusterFormed(c, 7), - "Incomplete cluster with server count: " + c.Servers.Length); - c.Close(); + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + Assert.True(assureClusterFormed(c, 7), + "Incomplete cluster with server count: " + c.Servers.Length); + c.Close(); + } // Create a new connection to start from scratch, and recieve // the entire server list at once. - c = Context.ConnectionFactory.CreateConnection(opts); - Assert.True(assureClusterFormed(c, 7), - "Incomplete cluster with server count: " + c.Servers.Length); - - for (int i = 0; i < 50; i++) + using (var c = Context.ConnectionFactory.CreateConnection(opts)) { - var c2 = Context.ConnectionFactory.CreateConnection(opts); Assert.True(assureClusterFormed(c, 7), "Incomplete cluster with server count: " + c.Servers.Length); - // The first urls should be the same. - Assert.Equal(c.Servers[0],c2.Servers[0]); + for (int i = 0; i < 50; i++) + { + using (var c2 = Context.ConnectionFactory.CreateConnection(opts)) + { + Assert.True(assureClusterFormed(c, 7), + "Incomplete cluster with server count: " + c.Servers.Length); - c2.Close(); - } + // The first urls should be the same. + Assert.Equal(c.Servers[0], c2.Servers[0]); + + c2.Close(); + } + } - c.Close(); + c.Close(); + } } } @@ -1477,34 +1503,36 @@ public void TestAsyncInfoProtocolUpdate() // from being added - for adding servers, 127.0.0.1 matches localhost. using (NATSServer s1 = NATSServer.Create(Context.Server3.Port, $"-a localhost --cluster nats://127.0.0.1:{Context.ClusterServer5.Port} --routes nats://127.0.0.1:{Context.ClusterServer6.Port}")) { - var c = Context.ConnectionFactory.CreateConnection(opts); - - Assert.True(c.Servers.Length == 1); - // check that credentials are stripped. - Assert.Equal($"nats://127.0.0.1:{Context.Server3.Port}", c.Servers[0]); - - // build an independent cluster - using (NATSServer s2 = NATSServer.Create(Context.Server4.Port, $"-a localhost --cluster nats://127.0.0.1:{Context.ClusterServer6.Port} --routes nats://127.0.0.1:{Context.ClusterServer5.Port}")) + using (var c = Context.ConnectionFactory.CreateConnection(opts)) { - // wait until the servers are routed and the conn has the updated - // server list. - assureClusterFormed(c, 2); - - // Ensure the first server remains in place and has not been - // randomized. + Assert.True(c.Servers.Length == 1); + // check that credentials are stripped. Assert.Equal($"nats://127.0.0.1:{Context.Server3.Port}", c.Servers[0]); - Assert.True(c.Servers.Length == 2); - Assert.True(c.DiscoveredServers.Length == 1); + // build an independent cluster + using (NATSServer s2 = NATSServer.Create(Context.Server4.Port, + $"-a localhost --cluster nats://127.0.0.1:{Context.ClusterServer6.Port} --routes nats://127.0.0.1:{Context.ClusterServer5.Port}")) + { + // wait until the servers are routed and the conn has the updated + // server list. + assureClusterFormed(c, 2); + + // Ensure the first server remains in place and has not been + // randomized. + Assert.Equal($"nats://127.0.0.1:{Context.Server3.Port}", c.Servers[0]); + Assert.True(c.Servers.Length == 2); + Assert.True(c.DiscoveredServers.Length == 1); + + + // sanity check to ensure we can connect to another server. + s1.Shutdown(); + Assert.True(evReconnect.WaitOne(10000)); + Assert.True(newUrl != null); + Assert.Contains(Context.Server4.Port.ToString(), c.ConnectedUrl); + } - // sanity check to ensure we can connect to another server. - s1.Shutdown(); - Assert.True(evReconnect.WaitOne(10000)); - Assert.True(newUrl != null); - Assert.Contains(Context.Server4.Port.ToString(), c.ConnectedUrl); + c.Close(); } - - c.Close(); } } @@ -1526,40 +1554,41 @@ public void TestAsyncInfoProtocolPrune() s2 = NATSServer.Create(Context.Server2.Port, $"-a 127.0.0.1 --cluster nats://127.0.0.1:{Context.ClusterServer2.Port} --routes nats://127.0.0.1:{Context.ClusterServer1.Port}"), s3 = NATSServer.Create(Context.Server3.Port, $"-a 127.0.0.1 --cluster nats://127.0.0.1:{Context.ClusterServer3.Port} --routes nats://127.0.0.1:{Context.ClusterServer1.Port}")) { + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + Assert.True(assureClusterFormed(c, 3), "Incomplete cluster with server count: " + c.Servers.Length); + // shutdown server 2 + s2.Shutdown(); - var c = Context.ConnectionFactory.CreateConnection(opts); - Assert.True(assureClusterFormed(c, 3), - "Incomplete cluster with server count: " + c.Servers.Length); - - // shutdown server 2 - s2.Shutdown(); + using (NATSServer s4 = NATSServer.Create(Context.Server4.Port, + $"-a 127.0.0.1 --cluster nats://127.0.0.1:{Context.ClusterServer4.Port} --routes nats://127.0.0.1:{Context.ClusterServer1.Port}")) + { + // wait for the update with new server to check. + Assert.True(evDS.WaitOne(10000)); + + // The server on port 4223 should be pruned out. + // + // Discovered servers should contain: + // ["nats://127.0.0.1:4223", + // "nats://127.0.0.1:4224"] + // + LinkedList discoveredServers = new LinkedList(c.DiscoveredServers); + Assert.True(discoveredServers.Count == 2); + Assert.Contains($"nats://127.0.0.1:{Context.Server3.Port}", discoveredServers); + Assert.Contains($"nats://127.0.0.1:{Context.Server4.Port}", discoveredServers); + + // shutdown server 1 and wait for reconnect. + s1.Shutdown(); + Assert.True(evRC.WaitOne(10000)); + // Make sure we did NOT delete our expclitly configured server. + LinkedList servers = new LinkedList(c.Servers); + Assert.True(servers.Count == 3); // explicit server is still there. + Assert.Contains($"nats://127.0.0.1:{Context.Server1.Port}", servers); + } - using (NATSServer s4 = NATSServer.Create(Context.Server4.Port, $"-a 127.0.0.1 --cluster nats://127.0.0.1:{Context.ClusterServer4.Port} --routes nats://127.0.0.1:{Context.ClusterServer1.Port}")) - { - // wait for the update with new server to check. - Assert.True(evDS.WaitOne(10000)); - - // The server on port 4223 should be pruned out. - // - // Discovered servers should contain: - // ["nats://127.0.0.1:4223", - // "nats://127.0.0.1:4224"] - // - LinkedList discoveredServers = new LinkedList(c.DiscoveredServers); - Assert.True(discoveredServers.Count == 2); - Assert.Contains($"nats://127.0.0.1:{Context.Server3.Port}", discoveredServers); - Assert.Contains($"nats://127.0.0.1:{Context.Server4.Port}", discoveredServers); - - // shutdown server 1 and wait for reconnect. - s1.Shutdown(); - Assert.True(evRC.WaitOne(10000)); - // Make sure we did NOT delete our expclitly configured server. - LinkedList servers = new LinkedList(c.Servers); - Assert.True(servers.Count == 3); // explicit server is still there. - Assert.Contains($"nats://127.0.0.1:{Context.Server1.Port}", servers); + c.Close(); } - c.Close(); } } @@ -1596,17 +1625,21 @@ public void TestServersRandomize() using (NATSServer.CreateFastAndVerify()) { - var c = Context.ConnectionFactory.CreateConnection(opts); - Assert.True(listsEqual(serverList, c.Servers)); - c.Close(); + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + Assert.True(listsEqual(serverList, c.Servers)); + c.Close(); + } bool wasRandom = false; opts.NoRandomize = false; for (int i = 0; i < 10; i++) { - c = Context.ConnectionFactory.CreateConnection(opts); - wasRandom = (listsEqual(serverList, c.Servers) == false); - c.Close(); + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + wasRandom = (listsEqual(serverList, c.Servers) == false); + c.Close(); + } if (wasRandom) break; @@ -1622,11 +1655,12 @@ public void TestServersRandomize() opts.Url = $"nats://localhost:{Context.DefaultServer.Port}"; for (int i = 0; i < 5; i++) { - c = Context.ConnectionFactory.CreateConnection(opts); - wasRandom = (listsEqual(serverList, c.Servers) == false); - Assert.True(Equals(serverList[0], c.Servers[0])); - c.Close(); - + using(var c = Context.ConnectionFactory.CreateConnection(opts)) + { + wasRandom = (listsEqual(serverList, c.Servers) == false); + Assert.True(Equals(serverList[0], c.Servers[0])); + c.Close(); + } if (wasRandom) break; } @@ -1642,28 +1676,32 @@ public void TestSimpleUrlArgument() var o = ConnectionFactory.GetDefaultOptions(); // simple url connect - Context.ConnectionFactory.CreateConnection("127.0.0.1").Close(); + using (var cn = Context.ConnectionFactory.CreateConnection("127.0.0.1")) + cn.Close(); // simple url o.Url = "127.0.0.1"; - Context.ConnectionFactory.CreateConnection(o); + using(Context.ConnectionFactory.CreateConnection(o)){} // servers with a simple hostname o.Url = null; o.Servers = new string[] { "127.0.0.1" }; - Context.ConnectionFactory.CreateConnection(o).Close(); + using(var cn = Context.ConnectionFactory.CreateConnection(o)) + cn.Close(); // simple url connect - Context.ConnectionFactory.CreateConnection("127.0.0.1, localhost").Close(); + using(var cn = Context.ConnectionFactory.CreateConnection("127.0.0.1, localhost")) + cn.Close(); // url with multiple hosts o.Url = "127.0.0.1,localhost"; - Context.ConnectionFactory.CreateConnection(o); + using (Context.ConnectionFactory.CreateConnection(o)) {} // servers with multiple hosts o.Url = null; o.Servers = new string[] { "127.0.0.1", "localhost" }; - Context.ConnectionFactory.CreateConnection(o).Close(); + using(var cn = Context.ConnectionFactory.CreateConnection(o)) + cn.Close(); } } @@ -1676,19 +1714,18 @@ public void TestNoEcho() var o = ConnectionFactory.GetDefaultOptions(); o.NoEcho = true; - var c = Context.ConnectionFactory.CreateConnection(o); - - c.SubscribeAsync("foo", (obj, args) => + using (var c = Context.ConnectionFactory.CreateConnection(o)) { - Interlocked.Increment(ref received); - }); - - c.Publish("foo", null); - c.Flush(); + using (c.SubscribeAsync("foo", (obj, args) => { Interlocked.Increment(ref received); })) + { + c.Publish("foo", null); + c.Flush(); - // hate sleeping, but with slow CI's, we need to give time to - //make sure that message never arrives. - Thread.Sleep(1000); + // hate sleeping, but with slow CI's, we need to give time to + //make sure that message never arrives. + Thread.Sleep(1000); + } + } Assert.True(Interlocked.Read(ref received) == 0); } @@ -1697,8 +1734,6 @@ public void TestNoEcho() [Fact] public void TestServersOption() { - Options o = Context.GetTestOptions(); - Assert.ThrowsAny(() => Context.ConnectionFactory.CreateConnection()); } diff --git a/src/Tests/IntegrationTests/TestCluster.cs b/src/Tests/IntegrationTests/TestCluster.cs index cf3eb6ddc..32110b631 100644 --- a/src/Tests/IntegrationTests/TestCluster.cs +++ b/src/Tests/IntegrationTests/TestCluster.cs @@ -31,7 +31,6 @@ public TestCluster(ClusterSuiteContext context) : base(context) { } [Fact] public void TestServersOption() { - IConnection c = null; ConnectionFactory cf = Context.ConnectionFactory; Options o = Context.GetTestOptions(); o.NoRandomize = true; @@ -42,17 +41,21 @@ public void TestServersOption() // Make sure we can connect to first server if running using (NATSServer ns = NATSServer.Create(Context.Server1.Port)) { - c = cf.CreateConnection(o); - Assert.Equal(Context.Server1.Url, c.ConnectedUrl); - c.Close(); + using (var c = cf.CreateConnection(o)) + { + Assert.Equal(Context.Server1.Url, c.ConnectedUrl); + c.Close(); + } } // make sure we can connect to a non-first server. - using (NATSServer ns = NATSServer.Create(Context.Server6.Port)) + using (NATSServer.Create(Context.Server6.Port)) { - c = cf.CreateConnection(o); - Assert.Equal(Context.Server6.Url, c.ConnectedUrl); - c.Close(); + using (var c = cf.CreateConnection(o)) + { + Assert.Equal(Context.Server6.Url, c.ConnectedUrl); + c.Close(); + } } } @@ -157,7 +160,6 @@ public void TestBasicClusterReconnect() [Fact] public void TestServerDiscoveredHandler() { - IConnection c = null; ConnectionFactory cf = Context.ConnectionFactory; Options o = Context.GetTestOptions(); @@ -177,7 +179,7 @@ public void TestServerDiscoveredHandler() using (NATSServer ns1 = NATSServer.Create(seedServerArgs)) { // ...then connect to it... - using (c = cf.CreateConnection(o)) + using (var c = cf.CreateConnection(o)) { Assert.Equal(Context.Server1.Url,c.ConnectedUrl); @@ -320,24 +322,24 @@ public void TestProperReconnectDelay() using (NATSServer s1 = NATSServer.Create(Context.Server1.Port)) { - IConnection c = Context.ConnectionFactory.CreateConnection(opts); - - lock (mu) + using (var c = Context.ConnectionFactory.CreateConnection(opts)) { - s1.Shutdown(); - // wait for disconnect - Assert.True(Monitor.Wait(mu, 10000)); + lock (mu) + { + s1.Shutdown(); + // wait for disconnect + Assert.True(Monitor.Wait(mu, 10000)); - // Wait, want to make sure we don't spin on - //reconnect to non-existant servers. - Thread.Sleep(1000); + // Wait, want to make sure we don't spin on + //reconnect to non-existant servers. + Thread.Sleep(1000); - Assert.False(closedCbCalled); - Assert.True(disconnectHandlerCalled); - Assert.True(c.State == ConnState.RECONNECTING); + Assert.False(closedCbCalled); + Assert.True(disconnectHandlerCalled); + Assert.True(c.State == ConnState.RECONNECTING); + } } - } } diff --git a/src/Tests/IntegrationTests/TestConnection.cs b/src/Tests/IntegrationTests/TestConnection.cs index 87b8a8b85..2596bb764 100644 --- a/src/Tests/IntegrationTests/TestConnection.cs +++ b/src/Tests/IntegrationTests/TestConnection.cs @@ -31,10 +31,12 @@ public void TestConnectionStatus() { using (NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - IConnection c = Context.OpenConnection(Context.Server1.Port); - Assert.Equal(ConnState.CONNECTED, c.State); - c.Close(); - Assert.Equal(ConnState.CLOSED, c.State); + using (var c = Context.OpenConnection(Context.Server1.Port)) + { + Assert.Equal(ConnState.CONNECTED, c.State); + c.Close(); + Assert.Equal(ConnState.CLOSED, c.State); + } } } @@ -49,13 +51,16 @@ public void TestCloseHandler() { ev.Set(); }; - IConnection c = Context.ConnectionFactory.CreateConnection(o); - c.Close(); - Assert.True(ev.WaitOne(1000)); + using (var c = Context.ConnectionFactory.CreateConnection(o)) + { + c.Close(); + Assert.True(ev.WaitOne(1000)); + } // now test using. ev.Reset(); - using (c = Context.ConnectionFactory.CreateConnection(o)) { }; + using (var c = Context.ConnectionFactory.CreateConnection(o)) { } + Assert.True(ev.WaitOne(1000)); } } @@ -79,19 +84,22 @@ public void TestCloseDisconnectedHandler() } }; - IConnection c = Context.ConnectionFactory.CreateConnection(o); - lock (mu) + using (var c = Context.ConnectionFactory.CreateConnection(o)) { - c.Close(); - Monitor.Wait(mu, 20000); + lock (mu) + { + c.Close(); + Monitor.Wait(mu, 20000); + } + + Assert.True(disconnected); } - Assert.True(disconnected); // now test using. disconnected = false; lock (mu) { - using (c = Context.ConnectionFactory.CreateConnection(o)) { }; + using (Context.ConnectionFactory.CreateConnection(o)) { } Monitor.Wait(mu, 20000); } Assert.True(disconnected); @@ -123,7 +131,7 @@ public void TestErrorHandlerWhenNotAllowingReconnectErrorShouldBeProvided() using (var s = NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - using (var cn = Context.ConnectionFactory.CreateConnection(opts)) + using (Context.ConnectionFactory.CreateConnection(opts)) { s.Bounce(1000); } @@ -168,7 +176,7 @@ public void TestErrorHandlerWhenAllowingReconnectErrorShouldNotBeProvided() using (var s = NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - using (var cn = Context.ConnectionFactory.CreateConnection(opts)) + using (Context.ConnectionFactory.CreateConnection(opts)) { s.Bounce(1000); Assert.True(reconEv.WaitOne(1000)); @@ -194,12 +202,14 @@ public void TestServerStopDisconnectedHandler() ev.Set(); }; - IConnection c = Context.ConnectionFactory.CreateConnection(o); - s.Bounce(1000); + using (var c = Context.ConnectionFactory.CreateConnection(o)) + { + s.Bounce(1000); - Assert.True(ev.WaitOne(10000)); + Assert.True(ev.WaitOne(10000)); - c.Close(); + c.Close(); + } } } @@ -208,34 +218,37 @@ public void TestClosedConnections() { using (NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - IConnection c = Context.OpenConnection(Context.Server1.Port); - ISyncSubscription s = c.SubscribeSync("foo"); - - c.Close(); + using (var c = Context.OpenConnection(Context.Server1.Port)) + { + using(var s = c.SubscribeSync("foo")) + { + c.Close(); - // While we can annotate all the exceptions in the test framework, - // just do it manually. - Assert.ThrowsAny(() => c.Publish("foo", null)); + // While we can annotate all the exceptions in the test framework, + // just do it manually. + Assert.ThrowsAny(() => c.Publish("foo", null)); - Assert.ThrowsAny(() => c.Publish(new Msg("foo"))); + Assert.ThrowsAny(() => c.Publish(new Msg("foo"))); - Assert.ThrowsAny(() => c.SubscribeAsync("foo")); + Assert.ThrowsAny(() => c.SubscribeAsync("foo")); - Assert.ThrowsAny(() => c.SubscribeSync("foo")); + Assert.ThrowsAny(() => c.SubscribeSync("foo")); - Assert.ThrowsAny(() => c.SubscribeAsync("foo", "bar")); + Assert.ThrowsAny(() => c.SubscribeAsync("foo", "bar")); - Assert.ThrowsAny(() => c.SubscribeSync("foo", "bar")); + Assert.ThrowsAny(() => c.SubscribeSync("foo", "bar")); - Assert.ThrowsAny(() => c.Request("foo", null)); + Assert.ThrowsAny(() => c.Request("foo", null)); - Assert.ThrowsAny(() => s.NextMessage()); + Assert.ThrowsAny(() => s.NextMessage()); - Assert.ThrowsAny(() => s.NextMessage(100)); + Assert.ThrowsAny(() => s.NextMessage(100)); - Assert.ThrowsAny(() => s.Unsubscribe()); + Assert.ThrowsAny(() => s.Unsubscribe()); - Assert.ThrowsAny(() => s.AutoUnsubscribe(1)); + Assert.ThrowsAny(() => s.AutoUnsubscribe(1)); + } + } } } @@ -247,8 +260,8 @@ public void TestConnectVerbose() var o = Context.GetTestOptions(Context.Server1.Port); o.Verbose = true; - IConnection c = Context.ConnectionFactory.CreateConnection(o); - c.Close(); + using(var c = Context.ConnectionFactory.CreateConnection(o)) + c.Close(); } } @@ -266,8 +279,8 @@ public void TestServerDiscoveredHandlerNotCalledOnConnect() serverDiscoveredCalled = true; }; - IConnection c = Context.ConnectionFactory.CreateConnection(o); - c.Close(); + using(var c = Context.ConnectionFactory.CreateConnection(o)) + c.Close(); Assert.False(serverDiscoveredCalled); } @@ -381,34 +394,37 @@ public void TestCallbacksOrder() } }; - IAsyncSubscription sub1 = nc.SubscribeAsync("foo", eh); - IAsyncSubscription sub2 = nc.SubscribeAsync("bar", eh); - nc.Flush(); - - ncp.Publish("foo", System.Text.Encoding.UTF8.GetBytes("hello")); - ncp.Publish("bar", System.Text.Encoding.UTF8.GetBytes("hello")); - ncp.Flush(); - - recvCh.WaitOne(3000); - - for (int i = 0; i < 3; i++) + using (IAsyncSubscription + sub1 = nc.SubscribeAsync("foo", eh), + sub2 = nc.SubscribeAsync("bar", eh)) { + nc.Flush(); + ncp.Publish("foo", System.Text.Encoding.UTF8.GetBytes("hello")); ncp.Publish("bar", System.Text.Encoding.UTF8.GetBytes("hello")); - } + ncp.Flush(); - ncp.Flush(); + recvCh.WaitOne(3000); - Assert.True(asyncErr1.WaitOne(3000)); - Assert.True(asyncErr2.WaitOne(3000)); + for (int i = 0; i < 3; i++) + { + ncp.Publish("foo", System.Text.Encoding.UTF8.GetBytes("hello")); + ncp.Publish("bar", System.Text.Encoding.UTF8.GetBytes("hello")); + } - serverNoAuth.Shutdown(); + ncp.Flush(); - Thread.Sleep(1000); - closed.Reset(); - nc.Close(); + Assert.True(asyncErr1.WaitOne(3000)); + Assert.True(asyncErr2.WaitOne(3000)); - Assert.True(closed.WaitOne(3000)); + serverNoAuth.Shutdown(); + + Thread.Sleep(1000); + closed.Reset(); + nc.Close(); + + Assert.True(closed.WaitOne(3000)); + } } @@ -473,11 +489,14 @@ public void TestGenerateUniqueInboxNames() for (var i = 0; i < 1000; i++) { - IConnection c = Context.OpenConnection(Context.Server1.Port); - var inboxName = c.NewInbox(); - c.Close(); - Assert.NotEqual(inboxName, lastInboxName); - lastInboxName = inboxName; + using (var c = Context.OpenConnection(Context.Server1.Port)) + { + var inboxName = c.NewInbox(); + c.Close(); + + Assert.NotEqual(inboxName, lastInboxName); + lastInboxName = inboxName; + } } } } @@ -569,7 +588,8 @@ public void TestNKey() // See nkey.conf opts.SetNkey("UCKKTOZV72L3NITTGNOCRDZUI5H632XCT4ZWPJBC2X3VEY72KJUWEZ2Z", "./config/certs/user.nk"); - Context.ConnectionFactory.CreateConnection(opts).Close(); + using(var cn = Context.ConnectionFactory.CreateConnection(opts)) + cn.Close(); } } @@ -581,8 +601,11 @@ public void TestInvalidNKey() var opts = Context.GetTestOptionsWithDefaultTimeout(Context.Server1.Port); opts.SetNkey("XXKKTOZV72L3NITTGNOCRDZUI5H632XCT4ZWPJBC2X3VEY72KJUWEZ2Z", "./config/certs/user.nk"); - Assert.Throws(()=> Context.ConnectionFactory.CreateConnection(opts).Close()); - + Assert.Throws(()=> + { + using(var cn = Context.ConnectionFactory.CreateConnection(opts)) + cn.Close(); + }); Assert.Throws(() => opts.SetNkey("", "./config/certs/user.nk")); Assert.Throws(() => opts.SetNkey("UCKKTOZV72L3NITTGNOCRDZUI5H632XCT4ZWPJBC2X3VEY72KJUWEZ2Z", "")); @@ -592,23 +615,25 @@ public void TestInvalidNKey() [Fact] public void Test20Security() { - IConnection c = null; AutoResetEvent ev = new AutoResetEvent(false); - using (NATSServer.CreateWithConfig(Context.Server1.Port, "operator.conf")) + using (var s1 = NATSServer.CreateWithConfig(Context.Server1.Port, "operator.conf")) { var opts = Context.GetTestOptionsWithDefaultTimeout(Context.Server1.Port); opts.ReconnectedEventHandler += (obj, args) => { ev.Set(); }; opts.SetUserCredentials("./config/certs/test.creds"); - c = Context.ConnectionFactory.CreateConnection(opts); - } + using (Context.ConnectionFactory.CreateConnection(opts)) + { + s1.Shutdown(); - // effectively bounce the server - using (NATSServer.CreateWithConfig(Context.Server1.Port, "operator.conf")) - { - // wait for reconnect. - Assert.True(ev.WaitOne(60000)); + // effectively bounce the server + using (NATSServer.CreateWithConfig(Context.Server1.Port, "operator.conf")) + { + // wait for reconnect. + Assert.True(ev.WaitOne(60000)); + } + } } } @@ -618,8 +643,10 @@ public void Test20SecurityFactoryApi() using (NATSServer.CreateWithConfig(Context.Server1.Port, "operator.conf")) { var serverUrl = Context.Server1.Url; - Context.ConnectionFactory.CreateConnection(serverUrl, "./config/certs/test.creds").Close(); - Context.ConnectionFactory.CreateConnection(serverUrl, "./config/certs/test.creds", "./config/certs/test.creds").Close(); + using(var cn = Context.ConnectionFactory.CreateConnection(serverUrl, "./config/certs/test.creds")) + cn.Close(); + using(var cn = Context.ConnectionFactory.CreateConnection(serverUrl, "./config/certs/test.creds", "./config/certs/test.creds")) + cn.Close(); Assert.Throws(() => Context.ConnectionFactory.CreateConnection(serverUrl, "")); Assert.Throws(() => Context.ConnectionFactory.CreateConnection(serverUrl, null)); @@ -715,7 +742,8 @@ public void Test20SecurityHandlers() }; var opts = Context.GetTestOptionsWithDefaultTimeout(Context.Server1.Port); opts.SetUserCredentialHandlers(jwtEh, sigEh); - Context.ConnectionFactory.CreateConnection(opts).Close(); + using(var cn = Context.ConnectionFactory.CreateConnection(opts)) + cn.Close(); } } @@ -727,28 +755,28 @@ public void TestUserPassTokenOptions() var opts = Context.GetTestOptions(Context.Server1.Port); opts.Token = "foo"; - var c = Context.ConnectionFactory.CreateConnection(opts); - c.Close(); + using(var c = Context.ConnectionFactory.CreateConnection(opts)) + c.Close(); opts.Token = "garbage"; - Assert.Throws(() => { Context.ConnectionFactory.CreateConnection(opts); }); + Assert.Throws(() => Context.ConnectionFactory.CreateConnection(opts)); } - using (NATSServer.Create(Context.Server1.Port, $"--user foo --pass b@r")) + using (NATSServer.Create(Context.Server1.Port, "--user foo --pass b@r")) { var opts = Context.GetTestOptions(Context.Server1.Port); opts.User = "foo"; opts.Password = "b@r"; - var c = Context.ConnectionFactory.CreateConnection(opts); - c.Close(); + using(var c = Context.ConnectionFactory.CreateConnection(opts)) + c.Close(); opts.Password = "garbage"; - Assert.Throws(() => { Context.ConnectionFactory.CreateConnection(opts); }); + Assert.Throws(() => Context.ConnectionFactory.CreateConnection(opts)); opts.User = "baz"; opts.Password = "bar"; - Assert.Throws(() => { Context.ConnectionFactory.CreateConnection(opts); }); + Assert.Throws(() => Context.ConnectionFactory.CreateConnection(opts)); } } @@ -768,32 +796,35 @@ public void TestDrain() { using (NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - var c = Context.OpenConnection(Context.Server1.Port); - - AutoResetEvent done = new AutoResetEvent(false); - int received = 0; - int expected = 10; - - var s = c.SubscribeAsync("foo", (obj, args) => + using (var c = Context.OpenConnection(Context.Server1.Port)) { - // allow messages to back up - Thread.Sleep(100); + AutoResetEvent done = new AutoResetEvent(false); + int received = 0; + int expected = 10; - int count = Interlocked.Increment(ref received); - if (count == expected) + using (c.SubscribeAsync("foo", (obj, args) => { - done.Set(); - } - }); + // allow messages to back up + Thread.Sleep(100); - for (int i = 0; i < expected; i++) - { - c.Publish("foo", null); - } - c.Drain(); + int count = Interlocked.Increment(ref received); + if (count == expected) + { + done.Set(); + } + })) + { + for (int i = 0; i < expected; i++) + { + c.Publish("foo", null); + } + + c.Drain(); - done.WaitOne(5000); - Assert.True(received == expected, string.Format("received {0} of {1}", received, expected)); + done.WaitOne(5000); + Assert.True(received == expected, string.Format("received {0} of {1}", received, expected)); + } + } } } @@ -802,38 +833,41 @@ public void TestDrainAsync() { using (NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - var c = Context.OpenConnection(Context.Server1.Port); - - AutoResetEvent done = new AutoResetEvent(false); - int received = 0; - int expected = 10; - - var s = c.SubscribeAsync("foo", (obj, args) => + using (var c = Context.OpenConnection(Context.Server1.Port)) { - // allow messages to back up - Thread.Sleep(250); + AutoResetEvent done = new AutoResetEvent(false); + int received = 0; + int expected = 10; - int count = Interlocked.Increment(ref received); - if (count == expected) + using (c.SubscribeAsync("foo", (obj, args) => { - done.Set(); - } - }); + // allow messages to back up + Thread.Sleep(250); - for (int i = 0; i < expected; i++) - { - c.Publish("foo", null); - } - var sw = Stopwatch.StartNew(); - var t = c.DrainAsync(); - sw.Stop(); + int count = Interlocked.Increment(ref received); + if (count == expected) + { + done.Set(); + } + })) + { + for (int i = 0; i < expected; i++) + { + c.Publish("foo", null); + } - // are we really async? - Assert.True(sw.ElapsedMilliseconds < 2500); - t.Wait(); + var sw = Stopwatch.StartNew(); + var t = c.DrainAsync(); + sw.Stop(); - done.WaitOne(5000); - Assert.True(received == expected, string.Format("received {0} of {1}", received, expected)); + // are we really async? + Assert.True(sw.ElapsedMilliseconds < 2500); + t.Wait(); + + done.WaitOne(5000); + Assert.True(received == expected, string.Format("received {0} of {1}", received, expected)); + } + } } } @@ -842,32 +876,35 @@ public void TestDrainSub() { using (NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - var c = Context.OpenConnection(Context.Server1.Port); - - AutoResetEvent done = new AutoResetEvent(false); - int received = 0; - int expected = 10; - - var s = c.SubscribeAsync("foo", (obj, args) => + using (var c = Context.OpenConnection(Context.Server1.Port)) { - // allow messages to back up - Thread.Sleep(100); + AutoResetEvent done = new AutoResetEvent(false); + int received = 0; + int expected = 10; - int count = Interlocked.Increment(ref received); - if (count == expected) + using (var s = c.SubscribeAsync("foo", (obj, args) => { - done.Set(); - } - }); + // allow messages to back up + Thread.Sleep(100); - for (int i = 0; i < expected; i++) - { - c.Publish("foo", null); - } - s.Drain(); + int count = Interlocked.Increment(ref received); + if (count == expected) + { + done.Set(); + } + })) + { + for (int i = 0; i < expected; i++) + { + c.Publish("foo", null); + } - done.WaitOne(5000); - Assert.True(received == expected, string.Format("received {0} of {1}", received, expected)); + s.Drain(); + + done.WaitOne(5000); + Assert.True(received == expected, string.Format("received {0} of {1}", received, expected)); + } + } } } @@ -876,38 +913,41 @@ public void TestDrainSubAsync() { using (NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - var c = Context.OpenConnection(Context.Server1.Port); - - AutoResetEvent done = new AutoResetEvent(false); - int received = 0; - int expected = 10; - - var s = c.SubscribeAsync("foo", (obj, args) => + using (var c = Context.OpenConnection(Context.Server1.Port)) { - // allow messages to back up - Thread.Sleep(100); + AutoResetEvent done = new AutoResetEvent(false); + int received = 0; + int expected = 10; - int count = Interlocked.Increment(ref received); - if (count == expected) + using (var s = c.SubscribeAsync("foo", (obj, args) => { - done.Set(); - } - }); + // allow messages to back up + Thread.Sleep(100); - for (int i = 0; i < expected; i++) - { - c.Publish("foo", null); - } - var sw = Stopwatch.StartNew(); - var t = s.DrainAsync(); - sw.Stop(); + int count = Interlocked.Increment(ref received); + if (count == expected) + { + done.Set(); + } + })) + { + for (int i = 0; i < expected; i++) + { + c.Publish("foo", null); + } + + var sw = Stopwatch.StartNew(); + var t = s.DrainAsync(); + sw.Stop(); - // are we really async? - Assert.True(sw.ElapsedMilliseconds < 1000); - t.Wait(); + // are we really async? + Assert.True(sw.ElapsedMilliseconds < 1000); + t.Wait(); - done.WaitOne(5000); - Assert.True(received == expected, string.Format("received {0} of {1}", received, expected)); + done.WaitOne(5000); + Assert.True(received == expected, string.Format("received {0} of {1}", received, expected)); + } + } } } @@ -916,16 +956,20 @@ public void TestDrainBadParams() { using (NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - var c = Context.OpenConnection(Context.Server1.Port); - var s = c.SubscribeAsync("foo"); - Assert.ThrowsAsync(() => c.DrainAsync(-1)); - Assert.ThrowsAsync(() => c.DrainAsync(0)); - Assert.ThrowsAsync(() => s.DrainAsync(-1)); - Assert.ThrowsAsync(() => s.DrainAsync(0)); - Assert.Throws(() => c.Drain(-1)); - Assert.Throws(() => c.Drain(0)); - Assert.Throws(() => s.Drain(-1)); - Assert.Throws(() => s.Drain(0)); + using (var c = Context.OpenConnection(Context.Server1.Port)) + { + using (var s = c.SubscribeAsync("foo")) + { + Assert.ThrowsAsync(() => c.DrainAsync(-1)); + Assert.ThrowsAsync(() => c.DrainAsync(0)); + Assert.ThrowsAsync(() => s.DrainAsync(-1)); + Assert.ThrowsAsync(() => s.DrainAsync(0)); + Assert.Throws(() => c.Drain(-1)); + Assert.Throws(() => c.Drain(0)); + Assert.Throws(() => s.Drain(-1)); + Assert.Throws(() => s.Drain(0)); + } + } } } @@ -934,31 +978,36 @@ public void TestDrainTimeoutAsync() { using (NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - var c = Context.OpenConnection(Context.Server1.Port); - var s = c.SubscribeAsync("foo", (obj, args) => + using (var c = Context.OpenConnection(Context.Server1.Port)) { - // allow about 30s of messages to back up - Thread.Sleep(1000); - }); + using (c.SubscribeAsync("foo", (obj, args) => + { + // allow about 30s of messages to back up + Thread.Sleep(1000); + })) + { + for (int i = 0; i < 30; i++) + { + c.Publish("foo", null); + } - for (int i = 0; i < 30; i++) - { - c.Publish("foo", null); - } - Stopwatch sw = Stopwatch.StartNew(); - var t = c.DrainAsync(1000); - try - { - t.Wait(); - } - catch (Exception) - { - // timed out. - } - sw.Stop(); + Stopwatch sw = Stopwatch.StartNew(); + var t = c.DrainAsync(1000); + try + { + t.Wait(); + } + catch (Exception) + { + // timed out. + } - // add slack for slow CI. - Assert.True(sw.ElapsedMilliseconds >= 1000); + sw.Stop(); + + // add slack for slow CI. + Assert.True(sw.ElapsedMilliseconds >= 1000); + } + } } } @@ -967,23 +1016,27 @@ public void TestDrainBlocking() { using (NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - var c = Context.OpenConnection(Context.Server1.Port); - var s = c.SubscribeAsync("foo", (obj, args) => + using (var c = Context.OpenConnection(Context.Server1.Port)) { - // allow about 30s of messages to back up - Thread.Sleep(100); - }); + using (c.SubscribeAsync("foo", (obj, args) => + { + // allow about 30s of messages to back up + Thread.Sleep(100); + })) + { + for (int i = 0; i < 30; i++) + { + c.Publish("foo", null); + } - for (int i = 0; i < 30; i++) - { - c.Publish("foo", null); - } - Stopwatch sw = Stopwatch.StartNew(); - Assert.Throws(() => c.Drain(500)); - sw.Stop(); + Stopwatch sw = Stopwatch.StartNew(); + Assert.Throws(() => c.Drain(500)); + sw.Stop(); - // add slack for slow CI. - Assert.True(sw.ElapsedMilliseconds >= 500); + // add slack for slow CI. + Assert.True(sw.ElapsedMilliseconds >= 500); + } + } } } @@ -1002,25 +1055,29 @@ public void TestSubDrainBlockingTimeout() ev.Set(); }; - var c = Context.ConnectionFactory.CreateConnection(opts); - var s = c.SubscribeAsync("foo", (obj, args) => + using (var c = Context.ConnectionFactory.CreateConnection(opts)) { - // allow about 30s of messages to back up - Thread.Sleep(1000); - }); + using (var s = c.SubscribeAsync("foo", (obj, args) => + { + // allow about 30s of messages to back up + Thread.Sleep(1000); + })) + { + for (int i = 0; i < 30; i++) + { + c.Publish("foo", null); + } - for (int i = 0; i < 30; i++) - { - c.Publish("foo", null); + Stopwatch sw = Stopwatch.StartNew(); + Assert.Throws(() => s.Drain(500)); + sw.Stop(); + + // add slack for slow CI. + Assert.True(sw.ElapsedMilliseconds >= 500); + Assert.True(ev.WaitOne(4000)); + Assert.True(aehHit); + } } - Stopwatch sw = Stopwatch.StartNew(); - Assert.Throws(() => s.Drain(500)); - sw.Stop(); - - // add slack for slow CI. - Assert.True(sw.ElapsedMilliseconds >= 500); - Assert.True(ev.WaitOne(4000)); - Assert.True(aehHit); } } @@ -1036,47 +1093,48 @@ public async Task TestDrainStateBehavior() { closed.Set(); }; - var c = Context.ConnectionFactory.CreateConnection(opts); - var s = c.SubscribeAsync("foo", (obj, args) => + using (var c = Context.ConnectionFactory.CreateConnection(opts)) { - // allow about 5s of messages to back up - Thread.Sleep(500); - }); + using (c.SubscribeAsync("foo", (obj, args) => + { + // allow about 5s of messages to back up + Thread.Sleep(500); + })) + { + for (int i = 0; i < 10; i++) + { + c.Publish("foo", null); + } - for (int i = 0; i < 10; i++) - { - c.Publish("foo", null); - } - // give us a long timeout to run our test. - var drainTask = c.DrainAsync(10000); + // give us a long timeout to run our test. + var drainTask = c.DrainAsync(10000); - Assert.True(c.State == ConnState.DRAINING_SUBS); - Assert.True(c.IsDraining()); + Assert.True(c.State == ConnState.DRAINING_SUBS); + Assert.True(c.IsDraining()); - Assert.Throws(() => c.SubscribeAsync("foo")); - Assert.Throws(() => c.SubscribeSync("foo")); + Assert.Throws(() => c.SubscribeAsync("foo")); + Assert.Throws(() => c.SubscribeSync("foo")); - await drainTask; + await drainTask; - Assert.Equal(ConnState.CLOSED, c.State); - Assert.False(c.IsDraining()); + Assert.Equal(ConnState.CLOSED, c.State); + Assert.False(c.IsDraining()); - // Make sure we hit connection closed. - Assert.True(closed.WaitOne(10000)); + // Make sure we hit connection closed. + Assert.True(closed.WaitOne(10000)); + } + } } } [Fact] public void TestFlushBuffer() { - IConnection c = null; AutoResetEvent disconnected = new AutoResetEvent(false); AutoResetEvent closed = new AutoResetEvent(false); - using (NATSServer.CreateFastAndVerify(Context.Server1.Port)) + using (var s1 = NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - - var opts = Context.GetTestOptionsWithDefaultTimeout(Context.Server1.Port); opts.ClosedEventHandler = (obj, args) => { @@ -1087,32 +1145,33 @@ public void TestFlushBuffer() disconnected.Set(); }; - c = Context.ConnectionFactory.CreateConnection(opts); - - // test empty buffer - c.FlushBuffer(); - // test multiple calls - c.FlushBuffer(); - - c.Publish("foo", new byte[10240]); - c.FlushBuffer(); + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + // test empty buffer + c.FlushBuffer(); + // test multiple calls + c.FlushBuffer(); + + c.Publish("foo", new byte[10240]); + c.FlushBuffer(); + + s1.Shutdown(); + + // wait until we're disconnected + Assert.True(disconnected.WaitOne(10000)); + + // Be sure we're reconnecting + Assert.True(c.State == ConnState.RECONNECTING); + + // should be a NOOP + c.FlushBuffer(); + + // close and then check the closed connection. + c.Close(); + Assert.True(closed.WaitOne(10000)); + Assert.Throws(() => c.FlushBuffer()); + } } - - // wait until we're disconnected - Assert.True(disconnected.WaitOne(10000)); - - // Be sure we're reconnecting - Assert.True(c.State == ConnState.RECONNECTING); - - // should be a NOOP - c.FlushBuffer(); - - // close and then check the closed connection. - c.Close(); - Assert.True(closed.WaitOne(10000)); - Assert.Throws(() => c.FlushBuffer()); - - c.Dispose(); } } @@ -1172,21 +1231,25 @@ public void TestConnectionSubscriberMemoryLeak() count++; var opts = Context.GetTestOptionsWithDefaultTimeout(Context.Server2.Port); using (IConnection conn = Context.ConnectionFactory.CreateConnection(opts)) { - conn.SubscribeAsync("foo", (obj, args) => + using(conn.SubscribeAsync("foo", (obj, args) => { // NOOP - }); - - var sub = conn.SubscribeAsync("foo"); - sub.MessageHandler += (obj, args) => + })) { - // NOOP - }; - sub.Start(); - - conn.SubscribeSync("foo"); - - conn.Close(); + using(var sub = conn.SubscribeAsync("foo")) + { + sub.MessageHandler += (obj, args) => + { + // NOOP + }; + sub.Start(); + + using(conn.SubscribeSync("foo")) + { + conn.Close(); + } + } + } } } @@ -1213,45 +1276,46 @@ public void TestMemoryLeakRequestReplyAsync() var startMem = GC.GetTotalMemory(true); - c.SubscribeAsync(subject, (sender, args) => + using(c.SubscribeAsync(subject, (sender, args) => { c.Publish(args.Message.Reply, data); c.Flush(); - }); - - for (int i = 0; i < 100; i++) + })) { - var msg = c.Request(subject, data, int.MaxValue); - } - GC.Collect(); - Thread.Sleep(5000); + for (int i = 0; i < 100; i++) + { + var msg = c.Request(subject, data, int.MaxValue); + } + GC.Collect(); + Thread.Sleep(5000); - double memGrowthPercent = 100 * (((double)(GC.GetTotalMemory(false) - startMem)) / (double)startMem); - Assert.True(memGrowthPercent < 30.0, string.Format("Memory grew {0} percent.", memGrowthPercent)); + double memGrowthPercent = 100 * (((double)(GC.GetTotalMemory(false) - startMem)) / (double)startMem); + Assert.True(memGrowthPercent < 30.0, string.Format("Memory grew {0} percent.", memGrowthPercent)); - startMem = GC.GetTotalMemory(true); - for (int i = 0; i < 100; i++) - { - c.Request(subject, data); - } - GC.Collect(); - Thread.Sleep(5000); + startMem = GC.GetTotalMemory(true); + for (int i = 0; i < 100; i++) + { + c.Request(subject, data); + } + GC.Collect(); + Thread.Sleep(5000); - memGrowthPercent = 100 * (((double)(GC.GetTotalMemory(false) - startMem)) / (double)startMem); - Assert.True(memGrowthPercent < 30.0, string.Format("Memory grew {0} percent.", memGrowthPercent)); + memGrowthPercent = 100 * (((double)(GC.GetTotalMemory(false) - startMem)) / (double)startMem); + Assert.True(memGrowthPercent < 30.0, string.Format("Memory grew {0} percent.", memGrowthPercent)); - startMem = GC.GetTotalMemory(true); - var token = new CancellationToken(); - for (int i = 0; i < 100; i++) - { - var t = c.RequestAsync(subject, data, int.MaxValue, token); - t.Wait(); - } - GC.Collect(); - Thread.Sleep(5000); + startMem = GC.GetTotalMemory(true); + var token = new CancellationToken(); + for (int i = 0; i < 100; i++) + { + var t = c.RequestAsync(subject, data, int.MaxValue, token); + t.Wait(); + } + GC.Collect(); + Thread.Sleep(5000); - memGrowthPercent = 100 * (((double)(GC.GetTotalMemory(false) - startMem)) / (double)startMem); - Assert.True(memGrowthPercent < 30.0, string.Format("Memory grew {0} percent.", memGrowthPercent)); + memGrowthPercent = 100 * (((double)(GC.GetTotalMemory(false) - startMem)) / (double)startMem); + Assert.True(memGrowthPercent < 30.0, string.Format("Memory grew {0} percent.", memGrowthPercent)); + } } } } diff --git a/src/Tests/IntegrationTests/TestEncoding.cs b/src/Tests/IntegrationTests/TestEncoding.cs index d9b7251c7..c13b83548 100644 --- a/src/Tests/IntegrationTests/TestEncoding.cs +++ b/src/Tests/IntegrationTests/TestEncoding.cs @@ -27,7 +27,9 @@ namespace IntegrationTests /// public class TestEncoding : TestSuite { - public TestEncoding(EncodingSuiteContext context) : base(context) { } + public TestEncoding(EncodingSuiteContext context) : base(context) + { + } public IEncodedConnection DefaultEncodedConnection => Context.OpenEncodedConnectionWithDefaultTimeout(Context.Server1.Port); @@ -44,7 +46,7 @@ public override bool Equals(Object o) if (o.GetType() != this.GetType()) return false; - SerializationTestObj to = (SerializationTestObj)o; + SerializationTestObj to = (SerializationTestObj) o; return (a == to.a && b == to.b && c.Equals(to.c)); } @@ -74,8 +76,8 @@ public void TestDefaultObjectSerialization() EventHandler eh = (sender, args) => { - // Ensure we blow up in the cast - SerializationTestObj so = (SerializationTestObj)args.ReceivedObject; + // Ensure we blow up in the cast + SerializationTestObj so = (SerializationTestObj) args.ReceivedObject; Assert.True(so.Equals(origObj)); lock (mu) @@ -108,16 +110,19 @@ public BasicObj(int value) public int A { get; set; } - public override int GetHashCode() { return base.GetHashCode(); } + public override int GetHashCode() + { + return base.GetHashCode(); + } public override bool Equals(Object o) { if (o.GetType() != GetType()) return false; - BasicObj to = (BasicObj)o; + BasicObj to = (BasicObj) o; - return (A == ((BasicObj)to).A); + return (A == ((BasicObj) to).A); } } @@ -128,33 +133,37 @@ public void TestEncodedDefaultRequestReplyThreadSafety() { using (IEncodedConnection c = DefaultEncodedConnection) { - c.SubscribeAsync("replier", (obj, args) => { + using (c.SubscribeAsync("replier", (obj, args) => + { try { - c.Publish(args.Reply, new BasicObj(((BasicObj)args.ReceivedObject).A)); + c.Publish(args.Reply, new BasicObj(((BasicObj) args.ReceivedObject).A)); } catch (Exception ex) { Assert.True(false, "Replier Exception: " + ex.Message); } - c.Flush(); - }); - c.Flush(); - using (IEncodedConnection c2 = DefaultEncodedConnection) + c.Flush(); + })) { - System.Threading.Tasks.Parallel.For(0, 20, i => + c.Flush(); + + using (IEncodedConnection c2 = DefaultEncodedConnection) { - try + System.Threading.Tasks.Parallel.For(0, 20, i => { - var bo = new BasicObj(i); - Assert.True(bo.Equals(c2.Request("replier", bo, 30000)), "Objects did not equal"); - } - catch (Exception ex) - { - Assert.True(false, "Exception: " + ex.Message); - } - }); + try + { + var bo = new BasicObj(i); + Assert.True(bo.Equals(c2.Request("replier", bo, 30000)), "Objects did not equal"); + } + catch (Exception ex) + { + Assert.True(false, "Exception: " + ex.Message); + } + }); + } } } } @@ -177,10 +186,11 @@ public void TestDefaultObjectSerialization() [DataContract] public class JsonObject { - [DataMember] - public string Value = ""; + [DataMember] public string Value = ""; - public JsonObject() { } + public JsonObject() + { + } public JsonObject(string val) { @@ -189,7 +199,7 @@ public JsonObject(string val) public override bool Equals(object obj) { - return (((JsonObject)obj).Value == Value); + return (((JsonObject) obj).Value == Value); } public override int GetHashCode() @@ -217,7 +227,7 @@ public void TestEncodedObjectSerization() ev.Set(); }; - using (IAsyncSubscription s = c.SubscribeAsync("foo", eh)) + using (c.SubscribeAsync("foo", eh)) { for (int i = 0; i < 10; i++) c.Publish("foo", jo); @@ -228,7 +238,7 @@ public void TestEncodedObjectSerization() } ev.Reset(); - using (IAsyncSubscription s = c.SubscribeAsync("foo", eh)) + using (c.SubscribeAsync("foo", eh)) { c.Publish("foo", "bar", jo); c.Flush(); @@ -255,10 +265,10 @@ public void TestEncodedInvalidObjectSerialization() EventHandler eh = (sender, args) => { - // Ensure we blow up in the cast or not implemented in .NET core - try + // Ensure we blow up in the cast or not implemented in .NET core + try { - Exception invalid = (Exception)args.ReceivedObject; + Exception invalid = (Exception) args.ReceivedObject; } catch (Exception) { @@ -306,7 +316,7 @@ internal byte[] jsonSerializer(object obj) byte[] buffer = stream.GetBuffer(); long len = stream.Position; var rv = new byte[len]; - Array.Copy(buffer, rv, (int)len); + Array.Copy(buffer, rv, (int) len); return rv; #else ArraySegment buffer; @@ -341,7 +351,7 @@ public void TestEncodedSerizationOverrides() EventHandler eh = (sender, args) => { - JsonObject so = (JsonObject)args.ReceivedObject; + JsonObject so = (JsonObject) args.ReceivedObject; Assert.True(so.Equals(origObj)); ev.Set(); @@ -372,25 +382,23 @@ public void TestEncodedObjectRequestReply() EventHandler eh = (sender, args) => { - JsonObject so = (JsonObject)args.ReceivedObject; + JsonObject so = (JsonObject) args.ReceivedObject; Assert.True(so.Equals(origObj)); c.Publish(args.Reply, new JsonObject("Received")); c.Flush(); }; - using (IAsyncSubscription s = c.SubscribeAsync("foo", eh)) + using (c.SubscribeAsync("foo", eh)) { - var jo = (JsonObject)c.Request("foo", origObj, 1000); - Assert.Equal("Received",jo.Value); + var jo = (JsonObject) c.Request("foo", origObj, 1000); + Assert.Equal("Received", jo.Value); - jo = (JsonObject)c.Request("foo", origObj, 1000); - Assert.Equal("Received",jo.Value); + jo = (JsonObject) c.Request("foo", origObj, 1000); + Assert.Equal("Received", jo.Value); } } } } } // class - -} // namespace - +} // namespace \ No newline at end of file diff --git a/src/Tests/IntegrationTests/TestReconnect.cs b/src/Tests/IntegrationTests/TestReconnect.cs index 1942b6c09..9eda74b56 100644 --- a/src/Tests/IntegrationTests/TestReconnect.cs +++ b/src/Tests/IntegrationTests/TestReconnect.cs @@ -121,40 +121,43 @@ public void TestBasicReconnectFunctionality() // NOOP }; - NATSServer ns = NATSServer.Create(Context.Server1.Port); - - using (IConnection c = Context.ConnectionFactory.CreateConnection(opts)) + using (var ns1 = NATSServer.Create(Context.Server1.Port)) { - IAsyncSubscription s = c.SubscribeAsync("foo"); - s.MessageHandler += (sender, args) => + using (IConnection c = Context.ConnectionFactory.CreateConnection(opts)) { - lock (msgLock) + using (var s = c.SubscribeAsync("foo")) { - Monitor.Pulse(msgLock); - } - }; + s.MessageHandler += (sender, args) => + { + lock (msgLock) + { + Monitor.Pulse(msgLock); + } + }; - s.Start(); - c.Flush(); + s.Start(); + c.Flush(); - lock (testLock) - { - ns.Shutdown(); - Assert.True(Monitor.Wait(testLock, 100000)); - } + lock (testLock) + { + ns1.Shutdown(); + Assert.True(Monitor.Wait(testLock, 100000)); + } - c.Publish("foo", Encoding.UTF8.GetBytes("Hello")); + c.Publish("foo", Encoding.UTF8.GetBytes("Hello")); - // restart the server. - using (ns = NATSServer.Create(Context.Server1.Port)) - { - lock (msgLock) - { - c.Flush(50000); - Assert.True(Monitor.Wait(msgLock, 10000)); - } + // restart the server. + using (NATSServer.Create(Context.Server1.Port)) + { + lock (msgLock) + { + c.Flush(50000); + Assert.True(Monitor.Wait(msgLock, 10000)); + } - Assert.True(c.Stats.Reconnects == 1); + Assert.True(c.Stats.Reconnects == 1); + } + } } } } @@ -181,65 +184,69 @@ public void TestExtendedReconnectFunctionality() }; byte[] payload = Encoding.UTF8.GetBytes("bar"); - NATSServer ns = NATSServer.Create(Context.Server1.Port); - - using (IConnection c = Context.ConnectionFactory.CreateConnection(opts)) + using (var ns1 = NATSServer.Create(Context.Server1.Port)) { - IAsyncSubscription s1 = c.SubscribeAsync("foo"); - IAsyncSubscription s2 = c.SubscribeAsync("foobar"); - - s1.MessageHandler += incrReceivedMessageHandler; - s2.MessageHandler += incrReceivedMessageHandler; + using (IConnection c = Context.ConnectionFactory.CreateConnection(opts)) + { + using (var s1 = c.SubscribeAsync("foo")) + { + s1.MessageHandler += incrReceivedMessageHandler; + + using (var s2 = c.SubscribeAsync("foobar")) + { + s2.MessageHandler += incrReceivedMessageHandler; - s1.Start(); - s2.Start(); + s1.Start(); + s2.Start(); - received = 0; + received = 0; - c.Publish("foo", payload); - c.Flush(); + c.Publish("foo", payload); + c.Flush(); - ns.Shutdown(); - // server is stopped here. + ns1.Shutdown(); + // server is stopped here. - Assert.True(disconnectedEvent.WaitOne(20000)); + Assert.True(disconnectedEvent.WaitOne(20000)); - // subscribe to bar while connected. - IAsyncSubscription s3 = c.SubscribeAsync("bar"); - s3.MessageHandler += incrReceivedMessageHandler; - s3.Start(); + // subscribe to bar while connected. + using (var s3 = c.SubscribeAsync("bar")) + { + s3.MessageHandler += incrReceivedMessageHandler; + s3.Start(); - // Unsub foobar while disconnected - s2.Unsubscribe(); + // Unsub foobar while disconnected + s2.Unsubscribe(); - c.Publish("foo", payload); - c.Publish("bar", payload); + c.Publish("foo", payload); + c.Publish("bar", payload); - // server is restarted here... - using (NATSServer ts = NATSServer.Create(Context.Server1.Port)) - { - // wait for reconnect - Assert.True(reconnectedEvent.WaitOne(60000)); + // server is restarted here... + using (NATSServer.Create(Context.Server1.Port)) + { + // wait for reconnect + Assert.True(reconnectedEvent.WaitOne(60000)); - c.Publish("foobar", payload); - c.Publish("foo", payload); + c.Publish("foobar", payload); + c.Publish("foo", payload); - using (IAsyncSubscription s4 = c.SubscribeAsync("done")) - { - AutoResetEvent doneEvent = new AutoResetEvent(false); - s4.MessageHandler += (sender, args) => - { - doneEvent.Set(); - }; + using (IAsyncSubscription s4 = c.SubscribeAsync("done")) + { + AutoResetEvent doneEvent = new AutoResetEvent(false); + s4.MessageHandler += (sender, args) => { doneEvent.Set(); }; - s4.Start(); + s4.Start(); - c.Publish("done", payload); - Assert.True(doneEvent.WaitOne(4000)); + c.Publish("done", payload); + Assert.True(doneEvent.WaitOne(4000)); + } + } // NATSServer + } + } } - } // NATSServer - - Assert.Equal(4, received); + + Assert.Equal(4, received); + } } } @@ -283,7 +290,6 @@ public void TestQueueSubsOnReconnect() { AutoResetEvent reconnectEvent = new AutoResetEvent(false); Options opts = getReconnectOptions(); - IConnection c; string subj = "foo.bar"; string qgroup = "workers"; @@ -295,39 +301,41 @@ public void TestQueueSubsOnReconnect() using(NATSServer ns = NATSServer.Create(Context.Server1.Port)) { - c = Context.ConnectionFactory.CreateConnection(opts); - - EventHandler eh = (sender, args) => + using (var c = Context.ConnectionFactory.CreateConnection(opts)) { - int seq = Convert.ToInt32(Encoding.UTF8.GetString(args.Message.Data)); - - lock (results) + EventHandler eh = (sender, args) => { - if (results.ContainsKey(seq) == false) - results.Add(seq, true); - } - }; + int seq = Convert.ToInt32(Encoding.UTF8.GetString(args.Message.Data)); - // Create Queue Subscribers - c.SubscribeAsync(subj, qgroup, eh); - c.SubscribeAsync(subj, qgroup, eh); + lock (results) + { + if (results.ContainsKey(seq) == false) + results.Add(seq, true); + } + }; - c.Flush(); + // Create Queue Subscribers + c.SubscribeAsync(subj, qgroup, eh); + c.SubscribeAsync(subj, qgroup, eh); - sendAndCheckMsgs(c, subj, 10); - } - // server should stop... + c.Flush(); - // give the OS time to shut it down. - Thread.Sleep(1000); + sendAndCheckMsgs(c, subj, 10); + + ns.Shutdown(); + + // give the OS time to shut it down. + Thread.Sleep(1000); - // start back up - using (NATSServer ns = NATSServer.Create(Context.Server1.Port)) - { - // wait for reconnect - Assert.True(reconnectEvent.WaitOne(6000)); + // start back up + using (NATSServer.Create(Context.Server1.Port)) + { + // wait for reconnect + Assert.True(reconnectEvent.WaitOne(6000)); - sendAndCheckMsgs(c, subj, 10); + sendAndCheckMsgs(c, subj, 10); + } + } } } @@ -340,21 +348,23 @@ public void TestClose() using (NATSServer s1 = NATSServer.Create(Context.Server1.Port)) { - IConnection c = Context.ConnectionFactory.CreateConnection(opts); - Assert.False(c.IsClosed()); - - s1.Shutdown(); - - Thread.Sleep(100); - Assert.False(c.IsClosed(), string.Format("Invalid state, expecting not closed, received: {0}", c.State)); - - using (NATSServer s2 = NATSServer.Create(Context.Server1.Port)) + using (var c = Context.ConnectionFactory.CreateConnection(opts)) { - Thread.Sleep(1000); Assert.False(c.IsClosed()); - - c.Close(); - Assert.True(c.IsClosed()); + + s1.Shutdown(); + + Thread.Sleep(100); + Assert.False(c.IsClosed(), string.Format("Invalid state, expecting not closed, received: {0}", c.State)); + + using (NATSServer s2 = NATSServer.Create(Context.Server1.Port)) + { + Thread.Sleep(1000); + Assert.False(c.IsClosed()); + + c.Close(); + Assert.True(c.IsClosed()); + } } } } @@ -368,9 +378,6 @@ public void TestIsReconnectingAndStatus() bool reconnected = false; object reconnectedLock = new object(); - - IConnection c = null; - Options opts = Context.GetTestOptions(Context.Server1.Port); opts.AllowReconnect = true; opts.MaxReconnect = 10000; @@ -396,41 +403,43 @@ public void TestIsReconnectingAndStatus() using (NATSServer s = NATSServer.Create(Context.Server1.Port)) { - c = Context.ConnectionFactory.CreateConnection(opts); + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { - Assert.True(c.State == ConnState.CONNECTED); - Assert.True(c.IsReconnecting() == false); - } - // server stops here... + Assert.True(c.State == ConnState.CONNECTED); + Assert.True(c.IsReconnecting() == false); + + s.Shutdown(); + + lock (disconnectedLock) + { + if (!disconnected) + Assert.True(Monitor.Wait(disconnectedLock, 10000)); + } - lock (disconnectedLock) - { - if (!disconnected) - Assert.True(Monitor.Wait(disconnectedLock, 10000)); - } + Assert.True(c.State == ConnState.RECONNECTING); + Assert.True(c.IsReconnecting() == true); - Assert.True(c.State == ConnState.RECONNECTING); - Assert.True(c.IsReconnecting() == true); + // restart the server + using (NATSServer.Create(Context.Server1.Port)) + { + lock (reconnectedLock) + { + // may have reconnected, if not, wait + if (!reconnected) + Assert.True(Monitor.Wait(reconnectedLock, 10000)); + } - // restart the server - using (NATSServer s = NATSServer.Create(Context.Server1.Port)) - { - lock (reconnectedLock) - { - // may have reconnected, if not, wait - if (!reconnected) - Assert.True(Monitor.Wait(reconnectedLock, 10000)); - } + Assert.True(c.IsReconnecting() == false); + Assert.True(c.State == ConnState.CONNECTED); - Assert.True(c.IsReconnecting() == false); - Assert.True(c.State == ConnState.CONNECTED); + c.Close(); + } - c.Close(); + Assert.True(c.IsReconnecting() == false); + Assert.True(c.State == ConnState.CLOSED); + } } - - Assert.True(c.IsReconnecting() == false); - Assert.True(c.State == ConnState.CLOSED); - } @@ -438,8 +447,6 @@ public void TestIsReconnectingAndStatus() public void TestReconnectVerbose() { // an exception stops and fails the test. - IConnection c = null; - Object reconnectLock = new Object(); bool reconnected = false; @@ -457,22 +464,23 @@ public void TestReconnectVerbose() using (NATSServer s = NATSServer.Create(Context.Server1.Port)) { - c = Context.ConnectionFactory.CreateConnection(opts); - c.Flush(); - - // exit the block and enter a new server block - this - // restarts the server. - } - - using (NATSServer s = NATSServer.Create(Context.Server1.Port)) - { - lock (reconnectLock) + using (var c = Context.ConnectionFactory.CreateConnection(opts)) { - if (!reconnected) - Monitor.Wait(reconnectLock, 5000); - } + c.Flush(); + + s.Shutdown(); + + using ( NATSServer.Create(Context.Server1.Port)) + { + lock (reconnectLock) + { + if (!reconnected) + Monitor.Wait(reconnectLock, 5000); + } - c.Flush(); + c.Flush(); + } + } } } @@ -489,9 +497,6 @@ public void TestReconnectBufferProperty() [Fact] public void TestReconnectBufferDisabled() { - IConnection c; - ISyncSubscription s; - AutoResetEvent disconnected = new AutoResetEvent(false); AutoResetEvent reconnected = new AutoResetEvent(false); @@ -503,37 +508,36 @@ public void TestReconnectBufferDisabled() using (var server = NATSServer.Create(Context.Server1.Port)) { // Create our client connections. - c = new ConnectionFactory().CreateConnection(opts); - s = c.SubscribeSync("foo"); - // let the server shutdown via dispose - } - - // wait until we're disconnected. - Assert.True(disconnected.WaitOne(5000)); - + using (var c = new ConnectionFactory().CreateConnection(opts)) + { + using (var s = c.SubscribeSync("foo")) + { + server.Shutdown(); + + // wait until we're disconnected. + Assert.True(disconnected.WaitOne(5000)); - // Publish a message. - Assert.Throws( () => { c.Publish("foo", null); }); + // Publish a message. + Assert.Throws( () => { c.Publish("foo", null); }); - using (var server = NATSServer.Create(Context.Server1.Port)) - { - // wait for the client to reconnect. - Assert.True(reconnected.WaitOne(20000)); + using (NATSServer.Create(Context.Server1.Port)) + { + // wait for the client to reconnect. + Assert.True(reconnected.WaitOne(20000)); - // Check that we do not receive a message. - Assert.Throws(() => { s.NextMessage(1000); }); + // Check that we do not receive a message. + Assert.Throws(() => { s.NextMessage(1000); }); + + c.Close(); + } + } + } } - - c.Close(); - c.Dispose(); } [Fact] public void TestReconnectBufferBoundary() { - IConnection c; - ISubscription s; - AutoResetEvent disconnected = new AutoResetEvent(false); var opts = Context.GetTestOptions(Context.Server1.Port); @@ -543,23 +547,25 @@ public void TestReconnectBufferBoundary() using (var server = NATSServer.Create(Context.Server1.Port)) { - c = new ConnectionFactory().CreateConnection(opts); - s = c.SubscribeAsync("foo", eh); - - // let the server shutdown via dispose + using (var c = new ConnectionFactory().CreateConnection(opts)) + { + using ( c.SubscribeAsync("foo", eh)) + { + server.Shutdown(); + + // wait until we're disconnected. + Assert.True(disconnected.WaitOne(5000)); + + // PUB foo 25\r\n<...> = 30 so first publish should be OK, 2nd publish + // should fail. + byte[] payload = new byte[18]; + c.Publish("foo", payload); + Assert.Throws(() => c.Publish("foo", payload)); + + c.Close(); + } + } } - - // wait until we're disconnected. - Assert.True(disconnected.WaitOne(5000)); - - // PUB foo 25\r\n<...> = 30 so first publish should be OK, 2nd publish - // should fail. - byte[] payload = new byte[18]; - c.Publish("foo", payload); - Assert.Throws(() => c.Publish("foo", payload)); - - c.Close(); - c.Dispose(); } } diff --git a/src/Tests/IntegrationTests/TestSubscriptions.cs b/src/Tests/IntegrationTests/TestSubscriptions.cs index 5f8940937..756794bde 100644 --- a/src/Tests/IntegrationTests/TestSubscriptions.cs +++ b/src/Tests/IntegrationTests/TestSubscriptions.cs @@ -421,16 +421,20 @@ public void TestNextMessageOnClosedSub() { using (IConnection c = Context.OpenConnection(Context.Server1.Port)) { - ISyncSubscription s = c.SubscribeSync("foo"); - s.Unsubscribe(); - - try + using (var s = c.SubscribeSync("foo")) { - s.NextMessage(); - } - catch (NATSBadSubscriptionException) { } // ignore. + s.Unsubscribe(); + + try + { + s.NextMessage(); + } + catch (NATSBadSubscriptionException) + { + } // ignore. - // any other exceptions will fail the test. + // any other exceptions will fail the test. + } } } } @@ -450,7 +454,7 @@ public void TestAsyncSubscriptionPending() { using (IConnection c = Context.OpenConnection(Context.Server1.Port)) { - ISubscription s = c.SubscribeAsync("foo", (sender, args) => + using (var s = c.SubscribeAsync("foo", (sender, args) => { evStart.WaitOne(60000); @@ -459,75 +463,77 @@ public void TestAsyncSubscriptionPending() { evSubDone.Set(); } - }); - - for (int i = 0; i < total; i++) + })) { - c.Publish("foo", data); - } - c.Flush(); + for (int i = 0; i < total; i++) + { + c.Publish("foo", data); + } - Thread.Sleep(1000); + c.Flush(); + + Thread.Sleep(1000); - int expectedPendingCount = total - 1; + int expectedPendingCount = total - 1; - // At least 1 message will be dequeued - Assert.True(s.QueuedMessageCount <= expectedPendingCount); + // At least 1 message will be dequeued + Assert.True(s.QueuedMessageCount <= expectedPendingCount); - Assert.True((s.MaxPendingBytes == (data.Length * total)) || - (s.MaxPendingBytes == (data.Length * expectedPendingCount))); - Assert.True((s.MaxPendingMessages == total) || - (s.MaxPendingMessages == expectedPendingCount)); - Assert.True((s.PendingBytes == (data.Length * total)) || - (s.PendingBytes == (data.Length * expectedPendingCount))); + Assert.True((s.MaxPendingBytes == (data.Length * total)) || + (s.MaxPendingBytes == (data.Length * expectedPendingCount))); + Assert.True((s.MaxPendingMessages == total) || + (s.MaxPendingMessages == expectedPendingCount)); + Assert.True((s.PendingBytes == (data.Length * total)) || + (s.PendingBytes == (data.Length * expectedPendingCount))); - long pendingBytes; - long pendingMsgs; + long pendingBytes; + long pendingMsgs; - s.GetPending(out pendingBytes, out pendingMsgs); - Assert.True(pendingBytes == s.PendingBytes); - Assert.True(pendingMsgs == s.PendingMessages); + s.GetPending(out pendingBytes, out pendingMsgs); + Assert.True(pendingBytes == s.PendingBytes); + Assert.True(pendingMsgs == s.PendingMessages); - long maxPendingBytes; - long maxPendingMsgs; - s.GetMaxPending(out maxPendingBytes, out maxPendingMsgs); - Assert.True(maxPendingBytes == s.MaxPendingBytes); - Assert.True(maxPendingMsgs == s.MaxPendingMessages); + long maxPendingBytes; + long maxPendingMsgs; + s.GetMaxPending(out maxPendingBytes, out maxPendingMsgs); + Assert.True(maxPendingBytes == s.MaxPendingBytes); + Assert.True(maxPendingMsgs == s.MaxPendingMessages); - Assert.True((s.PendingMessages == total) || - (s.PendingMessages == expectedPendingCount)); + Assert.True((s.PendingMessages == total) || + (s.PendingMessages == expectedPendingCount)); - Assert.True(s.Delivered == 1); - Assert.True(s.Dropped == 0); + Assert.True(s.Delivered == 1); + Assert.True(s.Dropped == 0); - evStart.Set(); - evSubDone.WaitOne(10000); + evStart.Set(); + evSubDone.WaitOne(10000); - Assert.True(s.QueuedMessageCount == 0); + Assert.True(s.QueuedMessageCount == 0); - Assert.True((s.MaxPendingBytes == (data.Length * total)) || - (s.MaxPendingBytes == (data.Length * expectedPendingCount))); - Assert.True((s.MaxPendingMessages == total) || - (s.MaxPendingMessages == expectedPendingCount)); + Assert.True((s.MaxPendingBytes == (data.Length * total)) || + (s.MaxPendingBytes == (data.Length * expectedPendingCount))); + Assert.True((s.MaxPendingMessages == total) || + (s.MaxPendingMessages == expectedPendingCount)); - Assert.True(s.PendingMessages == 0); - Assert.True(s.PendingBytes == 0); + Assert.True(s.PendingMessages == 0); + Assert.True(s.PendingBytes == 0); - Assert.True(s.Delivered == total); - Assert.True(s.Dropped == 0); + Assert.True(s.Delivered == total); + Assert.True(s.Dropped == 0); - s.Unsubscribe(); + s.Unsubscribe(); - Assert.ThrowsAny(() => s.MaxPendingBytes); + Assert.ThrowsAny(() => s.MaxPendingBytes); - Assert.ThrowsAny(() => s.MaxPendingMessages); + Assert.ThrowsAny(() => s.MaxPendingMessages); - Assert.ThrowsAny(() => s.PendingMessageLimit); + Assert.ThrowsAny(() => s.PendingMessageLimit); - Assert.ThrowsAny(() => s.PendingByteLimit); + Assert.ThrowsAny(() => s.PendingByteLimit); - Assert.ThrowsAny(() => s.SetPendingLimits(1, 10)); + Assert.ThrowsAny(() => s.SetPendingLimits(1, 10)); + } } } } @@ -550,7 +556,7 @@ public void TestAsyncPendingSubscriptionBatchSizeExactlyOne() using (IConnection c = Context.ConnectionFactory.CreateConnection(opts)) { - ISubscription s = c.SubscribeAsync("foo", (sender, args) => + using (var s = c.SubscribeAsync("foo", (sender, args) => { evStart.WaitOne(60000); @@ -559,65 +565,67 @@ public void TestAsyncPendingSubscriptionBatchSizeExactlyOne() { evSubDone.Set(); } - }); - - for (int i = 0; i < total; i++) + })) { - c.Publish("foo", data); - } - c.Flush(); + for (int i = 0; i < total; i++) + { + c.Publish("foo", data); + } + + c.Flush(); - Thread.Sleep(1000); + Thread.Sleep(1000); - int expectedPendingCount = total - 1; + int expectedPendingCount = total - 1; - // Exactly 1 message will be dequeued - Assert.True(s.QueuedMessageCount == expectedPendingCount); + // Exactly 1 message will be dequeued + Assert.True(s.QueuedMessageCount == expectedPendingCount); - Assert.True((s.MaxPendingBytes == (data.Length * total)) || - (s.MaxPendingBytes == (data.Length * expectedPendingCount))); - Assert.True((s.MaxPendingMessages == total) || - (s.MaxPendingMessages == expectedPendingCount)); - Assert.True((s.PendingBytes == (data.Length * total)) || - (s.PendingBytes == (data.Length * expectedPendingCount))); + Assert.True((s.MaxPendingBytes == (data.Length * total)) || + (s.MaxPendingBytes == (data.Length * expectedPendingCount))); + Assert.True((s.MaxPendingMessages == total) || + (s.MaxPendingMessages == expectedPendingCount)); + Assert.True((s.PendingBytes == (data.Length * total)) || + (s.PendingBytes == (data.Length * expectedPendingCount))); - long pendingBytes; - long pendingMsgs; + long pendingBytes; + long pendingMsgs; - s.GetPending(out pendingBytes, out pendingMsgs); - Assert.True(pendingBytes == s.PendingBytes); - Assert.True(pendingMsgs == s.PendingMessages); + s.GetPending(out pendingBytes, out pendingMsgs); + Assert.True(pendingBytes == s.PendingBytes); + Assert.True(pendingMsgs == s.PendingMessages); - long maxPendingBytes; - long maxPendingMsgs; - s.GetMaxPending(out maxPendingBytes, out maxPendingMsgs); - Assert.True(maxPendingBytes == s.MaxPendingBytes); - Assert.True(maxPendingMsgs == s.MaxPendingMessages); + long maxPendingBytes; + long maxPendingMsgs; + s.GetMaxPending(out maxPendingBytes, out maxPendingMsgs); + Assert.True(maxPendingBytes == s.MaxPendingBytes); + Assert.True(maxPendingMsgs == s.MaxPendingMessages); - Assert.True((s.PendingMessages == total) || - (s.PendingMessages == expectedPendingCount)); + Assert.True((s.PendingMessages == total) || + (s.PendingMessages == expectedPendingCount)); - Assert.True(s.Delivered == 1); - Assert.True(s.Dropped == 0); + Assert.True(s.Delivered == 1); + Assert.True(s.Dropped == 0); - evStart.Set(); - evSubDone.WaitOne(10000); + evStart.Set(); + evSubDone.WaitOne(10000); - Assert.True(s.QueuedMessageCount == 0); + Assert.True(s.QueuedMessageCount == 0); - Assert.True((s.MaxPendingBytes == (data.Length * total)) || - (s.MaxPendingBytes == (data.Length * expectedPendingCount))); - Assert.True((s.MaxPendingMessages == total) || - (s.MaxPendingMessages == expectedPendingCount)); + Assert.True((s.MaxPendingBytes == (data.Length * total)) || + (s.MaxPendingBytes == (data.Length * expectedPendingCount))); + Assert.True((s.MaxPendingMessages == total) || + (s.MaxPendingMessages == expectedPendingCount)); - Assert.True(s.PendingMessages == 0); - Assert.True(s.PendingBytes == 0); + Assert.True(s.PendingMessages == 0); + Assert.True(s.PendingBytes == 0); - Assert.True(s.Delivered == total); - Assert.True(s.Dropped == 0); + Assert.True(s.Delivered == total); + Assert.True(s.Dropped == 0); - s.Unsubscribe(); + s.Unsubscribe(); + } } } } @@ -633,40 +641,42 @@ public void TestSyncSubscriptionPending() { using (IConnection c = Context.OpenConnection(Context.Server1.Port)) { - ISyncSubscription s = c.SubscribeSync("foo"); - - for (int i = 0; i < total; i++) + using (var s = c.SubscribeSync("foo")) { - c.Publish("foo", data); - } - c.Flush(); + for (int i = 0; i < total; i++) + { + c.Publish("foo", data); + } - Assert.True(s.QueuedMessageCount == total); + c.Flush(); - Assert.True((s.MaxPendingBytes == (data.Length * total)) || - (s.MaxPendingBytes == (data.Length * total))); - Assert.True((s.MaxPendingMessages == total) || - (s.MaxPendingMessages == total)); + Assert.True(s.QueuedMessageCount == total); - Assert.True(s.Delivered == 0); - Assert.True(s.Dropped == 0); + Assert.True((s.MaxPendingBytes == (data.Length * total)) || + (s.MaxPendingBytes == (data.Length * total))); + Assert.True((s.MaxPendingMessages == total) || + (s.MaxPendingMessages == total)); - for (int i = 0; i < total; i++) - { - s.NextMessage(); - } + Assert.True(s.Delivered == 0); + Assert.True(s.Dropped == 0); + + for (int i = 0; i < total; i++) + { + s.NextMessage(); + } - Assert.True(s.QueuedMessageCount == 0); + Assert.True(s.QueuedMessageCount == 0); - Assert.True((s.MaxPendingBytes == (data.Length * total)) || - (s.MaxPendingBytes == (data.Length * total))); - Assert.True((s.MaxPendingMessages == total) || - (s.MaxPendingMessages == total)); + Assert.True((s.MaxPendingBytes == (data.Length * total)) || + (s.MaxPendingBytes == (data.Length * total))); + Assert.True((s.MaxPendingMessages == total) || + (s.MaxPendingMessages == total)); - Assert.True(s.Delivered == total); - Assert.True(s.Dropped == 0); + Assert.True(s.Delivered == total); + Assert.True(s.Dropped == 0); - s.Unsubscribe(); + s.Unsubscribe(); + } } } } @@ -682,24 +692,26 @@ public void TestAsyncSubscriptionPendingDrain() { using (IConnection c = Context.OpenConnection(Context.Server1.Port)) { - ISubscription s = c.SubscribeAsync("foo", (sender, args) => { }); - - for (int i = 0; i < total; i++) + using (var s = c.SubscribeAsync("foo", (sender, args) => { })) { - c.Publish("foo", data); - } - c.Flush(); + for (int i = 0; i < total; i++) + { + c.Publish("foo", data); + } - while (s.Delivered != total) - { - Thread.Sleep(50); - } + c.Flush(); + + while (s.Delivered != total) + { + Thread.Sleep(50); + } - Assert.True(s.Dropped == 0); - Assert.True(s.PendingBytes == 0); - Assert.True(s.PendingMessages == 0); + Assert.True(s.Dropped == 0); + Assert.True(s.PendingBytes == 0); + Assert.True(s.PendingMessages == 0); - s.Unsubscribe(); + s.Unsubscribe(); + } } } } @@ -715,24 +727,26 @@ public void TestSyncSubscriptionPendingDrain() { using (IConnection c = Context.OpenConnection(Context.Server1.Port)) { - ISyncSubscription s = c.SubscribeSync("foo"); - - for (int i = 0; i < total; i++) + using (var s = c.SubscribeSync("foo")) { - c.Publish("foo", data); - } - c.Flush(); + for (int i = 0; i < total; i++) + { + c.Publish("foo", data); + } - while (s.Delivered != total) - { - s.NextMessage(100); - } + c.Flush(); + + while (s.Delivered != total) + { + s.NextMessage(100); + } - Assert.True(s.Dropped == 0); - Assert.True(s.PendingBytes == 0); - Assert.True(s.PendingMessages == 0); + Assert.True(s.Dropped == 0); + Assert.True(s.PendingBytes == 0); + Assert.True(s.PendingMessages == 0); - s.Unsubscribe(); + s.Unsubscribe(); + } } } } @@ -758,47 +772,50 @@ public void TestSubDelTaskCountBasic() AutoResetEvent ev1 = new AutoResetEvent(false); AutoResetEvent ev2 = new AutoResetEvent(false); - IAsyncSubscription s1 = c.SubscribeAsync("foo", (obj, args) => + using (var s1 = c.SubscribeAsync("foo", (obj, args) => { s1Count++; if (s1Count == COUNT) { ev1.Set(); } - }); - - IAsyncSubscription s2 = c.SubscribeAsync("bar", (obj, args) => + })) { - s2Count++; - if (s2Count >= COUNT) + using (var s2 = c.SubscribeAsync("bar", (obj, args) => { - ev2.Set(); - } - }); + s2Count++; + if (s2Count >= COUNT) + { + ev2.Set(); + } + })) + { + for (int i = 0; i < 10; i++) + { + c.Publish("foo", null); + c.Publish("bar", null); + } - for (int i = 0; i < 10; i++) - { - c.Publish("foo", null); - c.Publish("bar", null); - } - c.Flush(); + c.Flush(); - Assert.True(ev1.WaitOne(10000)); - Assert.True(ev2.WaitOne(10000)); - s1.Unsubscribe(); + Assert.True(ev1.WaitOne(10000)); + Assert.True(ev2.WaitOne(10000)); + s1.Unsubscribe(); - Assert.True(s1Count == COUNT); - Assert.True(s2Count == COUNT); + Assert.True(s1Count == COUNT); + Assert.True(s2Count == COUNT); - ev2.Reset(); + ev2.Reset(); - c.Publish("bar", null); - c.Flush(); + c.Publish("bar", null); + c.Flush(); - Assert.True(ev2.WaitOne(10000)); - Assert.True(s2Count == COUNT + 1); + Assert.True(ev2.WaitOne(10000)); + Assert.True(s2Count == COUNT + 1); - s2.Unsubscribe(); + s2.Unsubscribe(); + } + } } } } @@ -839,7 +856,11 @@ public void TestSubDelTaskCountScaling() // ensure we are not creating a thread per subscriber. Assert.True(Process.GetCurrentProcess().Threads.Count < 500); - subs.ForEach((s) => { s.Unsubscribe(); }); + subs.ForEach(s => + { + s.Unsubscribe(); + s.Dispose(); + }); } } } @@ -985,9 +1006,11 @@ public void TestSubDelTaskCountWithSyncSub() { using (IConnection c = Context.ConnectionFactory.CreateConnection(opts)) { - ISyncSubscription s = c.SubscribeSync("foo"); - c.Publish("foo", null); - s.NextMessage(10000); + using (var s = c.SubscribeSync("foo")) + { + c.Publish("foo", null); + s.NextMessage(10000); + } } } } @@ -1001,24 +1024,25 @@ public void TestInvalidSubjects() EventHandler mh = (obj, args) => { /* NOOP */ }; using (NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - var c = Context.OpenConnection(Context.Server1.Port); - - foreach (string s in invalidSubjects) + using (var c = Context.OpenConnection(Context.Server1.Port)) { - Assert.Throws(() => c.SubscribeSync(s)); - Assert.Throws(() => c.SubscribeSync(s, "qgroup")); - Assert.Throws(() => c.SubscribeAsync(s)); - Assert.Throws(() => c.SubscribeAsync(s, mh)); - Assert.Throws(() => c.SubscribeAsync(s, "qgroup")); - Assert.Throws(() => c.SubscribeAsync(s, "qgroup", mh)); - } + foreach (string s in invalidSubjects) + { + Assert.Throws(() => c.SubscribeSync(s)); + Assert.Throws(() => c.SubscribeSync(s, "qgroup")); + Assert.Throws(() => c.SubscribeAsync(s)); + Assert.Throws(() => c.SubscribeAsync(s, mh)); + Assert.Throws(() => c.SubscribeAsync(s, "qgroup")); + Assert.Throws(() => c.SubscribeAsync(s, "qgroup", mh)); + } - foreach (string s in invalidQNames) - { - Assert.Throws(() => c.SubscribeSync("subject", s)); + foreach (string s in invalidQNames) + { + Assert.Throws(() => c.SubscribeSync("subject", s)); - Assert.Throws(() => c.SubscribeAsync("subject", s)); - Assert.Throws(() => c.SubscribeAsync("subject", s, mh)); + Assert.Throws(() => c.SubscribeAsync("subject", s)); + Assert.Throws(() => c.SubscribeAsync("subject", s, mh)); + } } } } @@ -1132,21 +1156,20 @@ public void TestRespondFailsWithClosedConnection() { using (IConnection c = Context.OpenConnection(Context.Server1.Port)) { - ISyncSubscription s = c.SubscribeSync("foo"); - - string replyTo = c.NewInbox(); - c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message")); - - Msg m = s.NextMessage(1000); - Assert.NotNull(m); - Assert.Equal(replyTo, m.Reply); + using (var s = c.SubscribeSync("foo")) + { + string replyTo = c.NewInbox(); + c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message")); - c.Close(); + Msg m = s.NextMessage(1000); + Assert.NotNull(m); + Assert.Equal(replyTo, m.Reply); - byte[] reply = Encoding.UTF8.GetBytes("reply"); - Assert.ThrowsAny(() => m.Respond(reply)); + c.Close(); - s.Dispose(); + byte[] reply = Encoding.UTF8.GetBytes("reply"); + Assert.ThrowsAny(() => m.Respond(reply)); + } } } } @@ -1154,40 +1177,33 @@ public void TestRespondFailsWithClosedConnection() [Fact] public void TestRespondFailsWithServerClosed() { - IConnection c = null; - ISyncSubscription s = null; - try + Msg m; + using (NATSServer ns = NATSServer.CreateFastAndVerify(Context.Server1.Port)) { - Msg m; - using (NATSServer ns = NATSServer.CreateFastAndVerify(Context.Server1.Port)) - { - Options options = Context.GetTestOptions(Context.Server1.Port); - options.AllowReconnect = false; - - c = Context.ConnectionFactory.CreateConnection(options); - s = c.SubscribeSync("foo"); + Options options = Context.GetTestOptions(Context.Server1.Port); + options.AllowReconnect = false; - string replyTo = c.NewInbox(); + using (var c = Context.ConnectionFactory.CreateConnection(options)) + { + using (var s = c.SubscribeSync("foo")) + { + string replyTo = c.NewInbox(); - c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message")); + c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message")); - m = s.NextMessage(1000); - Assert.NotNull(m); - Assert.Equal(replyTo, m.Reply); + m = s.NextMessage(1000); + Assert.NotNull(m); + Assert.Equal(replyTo, m.Reply); + + ns.Shutdown(); + + // Give the server time to close + Thread.Sleep(2000); - ns.Shutdown(); + byte[] reply = Encoding.UTF8.GetBytes("reply"); + Assert.ThrowsAny(() => m.Respond(reply)); + } } - - // Give the server time to close - Thread.Sleep(2000); - - byte[] reply = Encoding.UTF8.GetBytes("reply"); - Assert.ThrowsAny(() => m.Respond(reply)); - } - finally - { - c?.Dispose(); - s?.Dispose(); } } } diff --git a/src/Tests/IntegrationTests/TestTLS.cs b/src/Tests/IntegrationTests/TestTLS.cs index 526ebad5a..589d018b7 100644 --- a/src/Tests/IntegrationTests/TestTLS.cs +++ b/src/Tests/IntegrationTests/TestTLS.cs @@ -246,13 +246,15 @@ public void TestTlsReconnectAuthTimeout() ev.Set(); }; - IConnection c = Context.ConnectionFactory.CreateConnection(opts); - s1.Shutdown(); + using (Context.ConnectionFactory.CreateConnection(opts)) + { + s1.Shutdown(); - // This should fail over to S2 where an authorization timeout occurs - // then successfully reconnect to S3. + // This should fail over to S2 where an authorization timeout occurs + // then successfully reconnect to S3. - Assert.True(ev.WaitOne(20000)); + Assert.True(ev.WaitOne(20000)); + } } } @@ -280,30 +282,31 @@ public void TestTlsReconnectAuthTimeoutLateClose() ev.Set(); }; - IConnection c = Context.ConnectionFactory.CreateConnection(opts); - - // inject an authorization timeout, as if it were processed by an incoming server message. - // this is done at the parser level so that parsing is also tested, - // therefore it needs reflection since Parser is an internal type. - Type parserType = typeof(Connection).Assembly.GetType("NATS.Client.Parser"); - Assert.NotNull(parserType); + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + // inject an authorization timeout, as if it were processed by an incoming server message. + // this is done at the parser level so that parsing is also tested, + // therefore it needs reflection since Parser is an internal type. + Type parserType = typeof(Connection).Assembly.GetType("NATS.Client.Parser"); + Assert.NotNull(parserType); - BindingFlags flags = BindingFlags.NonPublic | BindingFlags.Instance; - object parser = Activator.CreateInstance(parserType, flags, null, new object[] { c }, null); - Assert.NotNull(parser); + BindingFlags flags = BindingFlags.NonPublic | BindingFlags.Instance; + object parser = Activator.CreateInstance(parserType, flags, null, new object[] {c}, null); + Assert.NotNull(parser); - MethodInfo parseMethod = parserType.GetMethod("parse", flags); - Assert.NotNull(parseMethod); + MethodInfo parseMethod = parserType.GetMethod("parse", flags); + Assert.NotNull(parseMethod); - byte[] bytes = "-ERR 'Authorization Timeout'\r\n".ToCharArray().Select(ch => (byte)ch).ToArray(); - parseMethod.Invoke(parser, new object[] { bytes, bytes.Length }); + byte[] bytes = "-ERR 'Authorization Timeout'\r\n".ToCharArray().Select(ch => (byte) ch).ToArray(); + parseMethod.Invoke(parser, new object[] {bytes, bytes.Length}); - // sleep to allow the client to process the error, then shutdown the server. - Thread.Sleep(250); - s1.Shutdown(); + // sleep to allow the client to process the error, then shutdown the server. + Thread.Sleep(250); + s1.Shutdown(); - // Wait for a reconnect. - Assert.True(ev.WaitOne(20000)); + // Wait for a reconnect. + Assert.True(ev.WaitOne(20000)); + } } } #endif diff --git a/src/Tests/IntegrationTests/TestUtilities.cs b/src/Tests/IntegrationTests/TestUtilities.cs index 946ff8177..ea0bcb81e 100644 --- a/src/Tests/IntegrationTests/TestUtilities.cs +++ b/src/Tests/IntegrationTests/TestUtilities.cs @@ -89,8 +89,8 @@ public static NATSServer CreateFastAndVerify(int port, string args = null) { try { - var c = cf.CreateConnection(opts); - c.Close(); + using(var c = cf.CreateConnection(opts)) + c.Close(); isVerifiedOk = true; break; } @@ -162,15 +162,24 @@ private static void stopProcess(Process p) { try { - var successfullyClosed = p.CloseMainWindow() || p.WaitForExit(100); - if (!successfullyClosed) + var s = false; + + if (p.MainWindowHandle != IntPtr.Zero) + s = p.CloseMainWindow(); + + if (!s) p.Kill(); - p.Close(); + + p.WaitForExit(250); } catch (Exception) { // ignored } + finally + { + p.Close(); + } } public void Shutdown() @@ -253,7 +262,7 @@ private TestSync(int numOfActors) private void Wait(int aquireCount, TimeSpan ts) { - if (System.Diagnostics.Debugger.IsAttached) + if (Debugger.IsAttached) { ts = TimeSpan.FromMilliseconds(-1); } diff --git a/src/Tests/UnitTests/UnitTests.csproj b/src/Tests/UnitTests/UnitTests.csproj index deb4c1601..6f9e449ac 100644 --- a/src/Tests/UnitTests/UnitTests.csproj +++ b/src/Tests/UnitTests/UnitTests.csproj @@ -9,7 +9,7 @@ - + all