diff --git a/src/NServiceBus.TransportTests/When_multiple_messages_are_available.cs b/src/NServiceBus.TransportTests/When_multiple_messages_are_available.cs index 7b7313a66d..eb16275e5a 100644 --- a/src/NServiceBus.TransportTests/When_multiple_messages_are_available.cs +++ b/src/NServiceBus.TransportTests/When_multiple_messages_are_available.cs @@ -1,59 +1,62 @@ -namespace NServiceBus.TransportTests; - -using System; -using System.Collections.Concurrent; -using System.Threading; -using System.Threading.Tasks; -using NUnit.Framework; -using Transport; - -public class When_multiple_messages_are_available : NServiceBusTransportTest +namespace NServiceBus.TransportTests { - [TestCase(TransportTransactionMode.None)] - [TestCase(TransportTransactionMode.ReceiveOnly)] - [TestCase(TransportTransactionMode.SendsAtomicWithReceive)] - [TestCase(TransportTransactionMode.TransactionScope)] - public async Task Should_handle_messages_concurrently(TransportTransactionMode transactionMode) + using System; + using System.Collections.Concurrent; + using System.Threading; + using System.Threading.Tasks; + using NUnit.Framework; + using Transport; + + public class When_multiple_messages_are_available : NServiceBusTransportTest { - const int concurrencyLevel = 10; - var onMessageCalls = new ConcurrentQueue(); + [TestCase(TransportTransactionMode.None)] + [TestCase(TransportTransactionMode.ReceiveOnly)] + [TestCase(TransportTransactionMode.SendsAtomicWithReceive)] + [TestCase(TransportTransactionMode.TransactionScope)] + public async Task Should_handle_messages_concurrently(TransportTransactionMode transactionMode) + { + const int concurrencyLevel = 10; + var onMessageCalls = new ConcurrentQueue(); - await StartPump(async (context, _) => - { - var tcs = CreateTaskCompletionSource(); - onMessageCalls.Enqueue(tcs); - // "block" current pipeline invocation - await tcs.Task; - }, - (errorContext, __) => throw new Exception("unexpected error", errorContext.Exception), - transactionMode, - pushRuntimeSettings: new PushRuntimeSettings(concurrencyLevel)); + await StartPump(async (context, _) => + { + var tcs = CreateTaskCompletionSource(); + onMessageCalls.Enqueue(tcs); + // "block" current pipeline invocation + await tcs.Task; + }, + (errorContext, __) => throw new Exception("unexpected error", errorContext.Exception), + transactionMode, + pushRuntimeSettings: new PushRuntimeSettings(concurrencyLevel)); - for (int i = 0; i < concurrencyLevel * 2; i++) - { - await SendMessage(InputQueueName); - } + for (int i = 0; i < concurrencyLevel * 2; i++) + { + await SendMessage(InputQueueName); + } - // we need to wait because it might take a bit till the pump has invoked all pipelines? - while (onMessageCalls.Count < concurrencyLevel) - { - await Task.Delay(50, TestTimeoutCancellationToken); - } + // we need to wait because it might take a bit till the pump has invoked all pipelines? + while (onMessageCalls.Count < concurrencyLevel) + { + await Task.Delay(50, TestTimeoutCancellationToken); + } - Assert.AreEqual(concurrencyLevel, onMessageCalls.Count, "should not process more messages than configured at once"); + int maximumConcurrentMessages = onMessageCalls.Count; - // unblock pumps - int messagesProcessed = 0; - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - while (messagesProcessed < concurrencyLevel * 2) - { - if (onMessageCalls.TryDequeue(out var messagePipelineTcs)) + // unblock pumps + int messagesProcessed = 0; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + while (messagesProcessed < concurrencyLevel * 2) { - messagePipelineTcs.SetResult(); - messagesProcessed++; + if (onMessageCalls.TryDequeue(out var messagePipelineTcs)) + { + messagePipelineTcs.SetResult(); + messagesProcessed++; + } + TestTimeoutCancellationToken.ThrowIfCancellationRequested(); } - TestTimeoutCancellationToken.ThrowIfCancellationRequested(); + + Assert.AreEqual(concurrencyLevel, maximumConcurrentMessages, "should not process more messages than configured at once"); + Assert.AreEqual(concurrencyLevel * 2, messagesProcessed, "should process all enqueued messages"); } - Assert.AreEqual(concurrencyLevel * 2, messagesProcessed, "should process all enqueued messages"); } } \ No newline at end of file