diff --git a/RockLib.Messaging.Tests/FakeMessageHandler.cs b/RockLib.Messaging.Tests/FakeMessageHandler.cs index 2c6aec89..d784cf99 100644 --- a/RockLib.Messaging.Tests/FakeMessageHandler.cs +++ b/RockLib.Messaging.Tests/FakeMessageHandler.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Threading.Tasks; namespace RockLib.Messaging.Tests { @@ -6,9 +7,10 @@ public class FakeMessageHandler : IMessageHandler { public List<(IReceiver Receiver, IReceiverMessage Message)> ReceivedMessages { get; } = new List<(IReceiver, IReceiverMessage)>(); - public void OnMessageReceived(IReceiver receiver, IReceiverMessage message) + public Task OnMessageReceivedAsync(IReceiver receiver, IReceiverMessage message) { ReceivedMessages.Add((receiver, message)); + return Task.FromResult(0); } } } diff --git a/RockLib.Messaging.Tests/ForwardingMessageHandlerTests.cs b/RockLib.Messaging.Tests/ForwardingMessageHandlerTests.cs index 5a6348ae..5332095e 100644 --- a/RockLib.Messaging.Tests/ForwardingMessageHandlerTests.cs +++ b/RockLib.Messaging.Tests/ForwardingMessageHandlerTests.cs @@ -1,6 +1,7 @@ using FluentAssertions; using NUnit.Framework; using RockLib.Messaging.Testing; +using System.Threading.Tasks; namespace RockLib.Messaging.Tests { @@ -8,7 +9,7 @@ namespace RockLib.Messaging.Tests public class ForwardingMessageHandlerTests { [Test] - public void OnMessageReceivedCallsInnerHandlerOnMessageReceivedWithForwardingReceiverMessage() + public async Task OnMessageReceivedCallsInnerHandlerOnMessageReceivedWithForwardingReceiverMessage() { var receiver = new FakeReceiver(); var forwardingReceiver = new ForwardingReceiver("foo", receiver); @@ -18,7 +19,7 @@ public void OnMessageReceivedCallsInnerHandlerOnMessageReceivedWithForwardingRec var message = new FakeReceiverMessage("Hello, world!"); - handler.OnMessageReceived(receiver, message); + await handler.OnMessageReceivedAsync(receiver, message); messageHandler.ReceivedMessages.Should().ContainSingle(); messageHandler.ReceivedMessages[0].Receiver.Should().BeSameAs(forwardingReceiver); diff --git a/RockLib.Messaging.Tests/ReceiverExtensionsTests.cs b/RockLib.Messaging.Tests/ReceiverExtensionsTests.cs new file mode 100644 index 00000000..958106e7 --- /dev/null +++ b/RockLib.Messaging.Tests/ReceiverExtensionsTests.cs @@ -0,0 +1,82 @@ +using NUnit.Framework; +using RockLib.Messaging.Testing; +using System.Threading.Tasks; + +namespace RockLib.Messaging.Tests +{ + [TestFixture] + public class ReceiverExtensionsTests + { + [Test] + public async Task TheCallbackPassedToStart1IsInvokedWhenAMessageIsReceived() + { + var receiver = new FakeReceiver(); + + var received = false; + + receiver.Start(async m => + { + received = true; + await m.AcknowledgeAsync(); + }); + + await receiver.MessageHandler.OnMessageReceivedAsync(receiver, new FakeReceiverMessage("Hello, world!")); + + Assert.True(received); + } + + [Test] + public async Task TheCallbackPassedToStart2IsInvokedWhenAMessageIsReceived() + { + var receiver = new FakeReceiver(); + + var received = false; + + receiver.Start(m => + { + received = true; + m.Acknowledge(); + }); + + await receiver.MessageHandler.OnMessageReceivedAsync(receiver, new FakeReceiverMessage("Hello, world!")); + + Assert.True(received); + } + + [Test] + public async Task TheCallbackPassedToStart3IsInvokedWhenAMessageIsReceived() + { + var receiver = new FakeReceiver(); + + var received = false; + + receiver.Start(async (r, m) => + { + received = true; + await m.AcknowledgeAsync(); + }); + + await receiver.MessageHandler.OnMessageReceivedAsync(receiver, new FakeReceiverMessage("Hello, world!")); + + Assert.True(received); + } + + [Test] + public async Task TheCallbackPassedToStart4IsInvokedWhenAMessageIsReceived() + { + var receiver = new FakeReceiver(); + + var received = false; + + receiver.Start((r, m) => + { + received = true; + m.Acknowledge(); + }); + + await receiver.MessageHandler.OnMessageReceivedAsync(receiver, new FakeReceiverMessage("Hello, world!")); + + Assert.True(received); + } + } +} diff --git a/RockLib.Messaging/ForwardingMessageHandler.cs b/RockLib.Messaging/ForwardingMessageHandler.cs index 48912497..75d59528 100644 --- a/RockLib.Messaging/ForwardingMessageHandler.cs +++ b/RockLib.Messaging/ForwardingMessageHandler.cs @@ -1,4 +1,6 @@ -namespace RockLib.Messaging +using System.Threading.Tasks; + +namespace RockLib.Messaging { /// /// A decorator for the interface that forwards @@ -30,7 +32,7 @@ internal ForwardingMessageHandler(ForwardingReceiver forwardingReceiver, IMessag /// /// Handles a received message. /// - /// When invoked, this method invokes the + /// When invoked, this method invokes the /// method of the property. It passes the /// property as the receiver argument and /// a new decorator as the message @@ -39,7 +41,7 @@ internal ForwardingMessageHandler(ForwardingReceiver forwardingReceiver, IMessag /// /// The instance of that received the message. /// The message that was received. - public void OnMessageReceived(IReceiver receiver, IReceiverMessage message) => - MessageHandler.OnMessageReceived(ForwardingReceiver, new ForwardingReceiverMessage(ForwardingReceiver, message)); + public Task OnMessageReceivedAsync(IReceiver receiver, IReceiverMessage message) => + MessageHandler.OnMessageReceivedAsync(ForwardingReceiver, new ForwardingReceiverMessage(ForwardingReceiver, message)); } } diff --git a/RockLib.Messaging/ForwardingReceiver.cs b/RockLib.Messaging/ForwardingReceiver.cs index 97991824..0b6e0138 100644 --- a/RockLib.Messaging/ForwardingReceiver.cs +++ b/RockLib.Messaging/ForwardingReceiver.cs @@ -152,7 +152,7 @@ public ForwardingReceiver(string name, IReceiver receiver, /// /// Gets or sets the message handler for this receiver. When set, the receiver is started - /// and will invoke the value's method + /// and will invoke the value's method /// when messages are received. /// /// When set, this property sets the of the diff --git a/RockLib.Messaging/IMessageHandler.cs b/RockLib.Messaging/IMessageHandler.cs index a1295656..cd5b00a0 100644 --- a/RockLib.Messaging/IMessageHandler.cs +++ b/RockLib.Messaging/IMessageHandler.cs @@ -1,4 +1,6 @@ -namespace RockLib.Messaging +using System.Threading.Tasks; + +namespace RockLib.Messaging { /// /// Defines an object that handles the messages received by an instance @@ -11,6 +13,6 @@ public interface IMessageHandler /// /// The instance of that received the message. /// The message that was received. - void OnMessageReceived(IReceiver receiver, IReceiverMessage message); + Task OnMessageReceivedAsync(IReceiver receiver, IReceiverMessage message); } } \ No newline at end of file diff --git a/RockLib.Messaging/IReceiver.cs b/RockLib.Messaging/IReceiver.cs index 6e19df5a..d13b808c 100644 --- a/RockLib.Messaging/IReceiver.cs +++ b/RockLib.Messaging/IReceiver.cs @@ -15,7 +15,7 @@ public interface IReceiver : IDisposable /// /// Gets or sets the message handler for this receiver. When set, the receiver is started - /// and will invoke the value's method + /// and will invoke the value's method /// when messages are received. /// /// diff --git a/RockLib.Messaging/OnMessageReceivedDelegate.cs b/RockLib.Messaging/OnMessageReceivedDelegate.cs index 05b75022..80d5b691 100644 --- a/RockLib.Messaging/OnMessageReceivedDelegate.cs +++ b/RockLib.Messaging/OnMessageReceivedDelegate.cs @@ -1,9 +1,18 @@ -namespace RockLib.Messaging +using System.Threading.Tasks; + +namespace RockLib.Messaging { /// - /// Defines a function that handles a received message. + /// Defines a synchronous function that handles a received message. /// /// The instance of that received the message. /// The message that was received. public delegate void OnMessageReceivedDelegate(IReceiver receiver, IReceiverMessage message); + + /// + /// Defines an asynchronous function that handles a received message. + /// + /// The instance of that received the message. + /// The message that was received. + public delegate Task OnMessageReceivedAsyncDelegate(IReceiver receiver, IReceiverMessage message); } diff --git a/RockLib.Messaging/Receiver.cs b/RockLib.Messaging/Receiver.cs index 30271d89..6438e7dd 100644 --- a/RockLib.Messaging/Receiver.cs +++ b/RockLib.Messaging/Receiver.cs @@ -26,7 +26,7 @@ protected Receiver(string name) /// /// Gets or sets the message handler for this receiver. When set, the receiver is started - /// and will invoke the value's method + /// and will invoke the value's method /// when messages are received. /// public IMessageHandler MessageHandler diff --git a/RockLib.Messaging/ReceiverExtensions.cs b/RockLib.Messaging/ReceiverExtensions.cs index c3bc7e39..049f3f67 100644 --- a/RockLib.Messaging/ReceiverExtensions.cs +++ b/RockLib.Messaging/ReceiverExtensions.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; namespace RockLib.Messaging { @@ -26,7 +27,29 @@ public static void Start(this IReceiver receiver, Action onMes public static void Start(this IReceiver receiver, OnMessageReceivedDelegate onMessageReceived) { if (onMessageReceived == null) throw new ArgumentNullException(nameof(onMessageReceived)); - receiver.Start(new DelegateMessageHandler(onMessageReceived)); + receiver.Start(new SyncDelegateMessageHandler(onMessageReceived)); + } + + /// + /// Start listening for messages and handle them using the specified callback function. + /// + /// The receiver to start. + /// A function that is invoked when a message is received. + public static void Start(this IReceiver receiver, Func onMessageReceivedAsync) + { + if (onMessageReceivedAsync == null) throw new ArgumentNullException(nameof(onMessageReceivedAsync)); + receiver.Start((_, message) => onMessageReceivedAsync(message)); + } + + /// + /// Start listening for messages and handle them using the specified callback function. + /// + /// The receiver to start. + /// A function that is invoked when a message is received. + public static void Start(this IReceiver receiver, OnMessageReceivedAsyncDelegate onMessageReceivedAsync) + { + if (onMessageReceivedAsync == null) throw new ArgumentNullException(nameof(onMessageReceivedAsync)); + receiver.Start(new AsyncDelegateMessageHandler(onMessageReceivedAsync)); } /// @@ -44,15 +67,40 @@ public static void Start(this IReceiver receiver, IMessageHandler messageHandler receiver.MessageHandler = messageHandler ?? throw new ArgumentNullException(nameof(messageHandler)); } - private class DelegateMessageHandler : IMessageHandler + private class SyncDelegateMessageHandler : IMessageHandler { private readonly OnMessageReceivedDelegate _onMessageReceived; - public DelegateMessageHandler(OnMessageReceivedDelegate onMessageReceived) => + public SyncDelegateMessageHandler(OnMessageReceivedDelegate onMessageReceived) => _onMessageReceived = onMessageReceived; - public void OnMessageReceived(IReceiver receiver, IReceiverMessage message) => - _onMessageReceived(receiver, message); + public Task OnMessageReceivedAsync(IReceiver receiver, IReceiverMessage message) + { + var source = new TaskCompletionSource(); + + try + { + _onMessageReceived(receiver, message); + source.SetResult(0); + } + catch (Exception ex) + { + source.SetException(ex); + } + + return source.Task; + } + } + + private class AsyncDelegateMessageHandler : IMessageHandler + { + private readonly OnMessageReceivedAsyncDelegate _onMessageReceivedAsync; + + public AsyncDelegateMessageHandler(OnMessageReceivedAsyncDelegate onMessageReceivedAsync) => + _onMessageReceivedAsync = onMessageReceivedAsync; + + public Task OnMessageReceivedAsync(IReceiver receiver, IReceiverMessage message) => + _onMessageReceivedAsync(receiver, message); } } } \ No newline at end of file