Skip to content

Commit

Permalink
move assertion
Browse files Browse the repository at this point in the history
  • Loading branch information
timbussmann committed Sep 18, 2023
1 parent ec056ab commit 6c900ff
Showing 1 changed file with 50 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,59 +1,62 @@
namespace NServiceBus.TransportTests;

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Transport;

public class When_multiple_messages_are_available : NServiceBusTransportTest
namespace NServiceBus.TransportTests
{
[TestCase(TransportTransactionMode.None)]
[TestCase(TransportTransactionMode.ReceiveOnly)]
[TestCase(TransportTransactionMode.SendsAtomicWithReceive)]
[TestCase(TransportTransactionMode.TransactionScope)]
public async Task Should_handle_messages_concurrently(TransportTransactionMode transactionMode)
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Transport;

public class When_multiple_messages_are_available : NServiceBusTransportTest
{
const int concurrencyLevel = 10;
var onMessageCalls = new ConcurrentQueue<TaskCompletionSource>();
[TestCase(TransportTransactionMode.None)]
[TestCase(TransportTransactionMode.ReceiveOnly)]
[TestCase(TransportTransactionMode.SendsAtomicWithReceive)]
[TestCase(TransportTransactionMode.TransactionScope)]
public async Task Should_handle_messages_concurrently(TransportTransactionMode transactionMode)
{
const int concurrencyLevel = 10;
var onMessageCalls = new ConcurrentQueue<TaskCompletionSource>();

await StartPump(async (context, _) =>
{
var tcs = CreateTaskCompletionSource();
onMessageCalls.Enqueue(tcs);
// "block" current pipeline invocation
await tcs.Task;
},
(errorContext, __) => throw new Exception("unexpected error", errorContext.Exception),
transactionMode,
pushRuntimeSettings: new PushRuntimeSettings(concurrencyLevel));
await StartPump(async (context, _) =>
{
var tcs = CreateTaskCompletionSource();
onMessageCalls.Enqueue(tcs);
// "block" current pipeline invocation
await tcs.Task;
},
(errorContext, __) => throw new Exception("unexpected error", errorContext.Exception),
transactionMode,
pushRuntimeSettings: new PushRuntimeSettings(concurrencyLevel));

for (int i = 0; i < concurrencyLevel * 2; i++)
{
await SendMessage(InputQueueName);
}
for (int i = 0; i < concurrencyLevel * 2; i++)
{
await SendMessage(InputQueueName);
}

// we need to wait because it might take a bit till the pump has invoked all pipelines?
while (onMessageCalls.Count < concurrencyLevel)
{
await Task.Delay(50, TestTimeoutCancellationToken);
}
// we need to wait because it might take a bit till the pump has invoked all pipelines?
while (onMessageCalls.Count < concurrencyLevel)
{
await Task.Delay(50, TestTimeoutCancellationToken);
}

Assert.AreEqual(concurrencyLevel, onMessageCalls.Count, "should not process more messages than configured at once");
int maximumConcurrentMessages = onMessageCalls.Count;

// unblock pumps
int messagesProcessed = 0;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
while (messagesProcessed < concurrencyLevel * 2)
{
if (onMessageCalls.TryDequeue(out var messagePipelineTcs))
// unblock pumps
int messagesProcessed = 0;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
while (messagesProcessed < concurrencyLevel * 2)
{
messagePipelineTcs.SetResult();
messagesProcessed++;
if (onMessageCalls.TryDequeue(out var messagePipelineTcs))
{
messagePipelineTcs.SetResult();
messagesProcessed++;
}
TestTimeoutCancellationToken.ThrowIfCancellationRequested();
}
TestTimeoutCancellationToken.ThrowIfCancellationRequested();

Assert.AreEqual(concurrencyLevel, maximumConcurrentMessages, "should not process more messages than configured at once");
Assert.AreEqual(concurrencyLevel * 2, messagesProcessed, "should process all enqueued messages");
}
Assert.AreEqual(concurrencyLevel * 2, messagesProcessed, "should process all enqueued messages");
}
}

0 comments on commit 6c900ff

Please sign in to comment.