Skip to content

Commit

Permalink
Merge pull request #37 from RockLib/async
Browse files Browse the repository at this point in the history
Message handler interface is async
  • Loading branch information
JustinMoss committed Dec 19, 2018
2 parents 51a75f8 + 1439b6a commit 749b24f
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 19 deletions.
4 changes: 3 additions & 1 deletion RockLib.Messaging.Tests/FakeMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace RockLib.Messaging.Tests
{
public class FakeMessageHandler : IMessageHandler
{
public List<(IReceiver Receiver, IReceiverMessage Message)> ReceivedMessages { get; } = new List<(IReceiver, IReceiverMessage)>();

public void OnMessageReceived(IReceiver receiver, IReceiverMessage message)
public Task OnMessageReceivedAsync(IReceiver receiver, IReceiverMessage message)
{
ReceivedMessages.Add((receiver, message));
return Task.FromResult(0);
}
}
}
5 changes: 3 additions & 2 deletions RockLib.Messaging.Tests/ForwardingMessageHandlerTests.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
using FluentAssertions;
using NUnit.Framework;
using RockLib.Messaging.Testing;
using System.Threading.Tasks;

namespace RockLib.Messaging.Tests
{
[TestFixture]
public class ForwardingMessageHandlerTests
{
[Test]
public void OnMessageReceivedCallsInnerHandlerOnMessageReceivedWithForwardingReceiverMessage()
public async Task OnMessageReceivedCallsInnerHandlerOnMessageReceivedWithForwardingReceiverMessage()
{
var receiver = new FakeReceiver();
var forwardingReceiver = new ForwardingReceiver("foo", receiver);
Expand All @@ -18,7 +19,7 @@ public void OnMessageReceivedCallsInnerHandlerOnMessageReceivedWithForwardingRec

var message = new FakeReceiverMessage("Hello, world!");

handler.OnMessageReceived(receiver, message);
await handler.OnMessageReceivedAsync(receiver, message);

messageHandler.ReceivedMessages.Should().ContainSingle();
messageHandler.ReceivedMessages[0].Receiver.Should().BeSameAs(forwardingReceiver);
Expand Down
82 changes: 82 additions & 0 deletions RockLib.Messaging.Tests/ReceiverExtensionsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using NUnit.Framework;
using RockLib.Messaging.Testing;
using System.Threading.Tasks;

namespace RockLib.Messaging.Tests
{
[TestFixture]
public class ReceiverExtensionsTests
{
[Test]
public async Task TheCallbackPassedToStart1IsInvokedWhenAMessageIsReceived()
{
var receiver = new FakeReceiver();

var received = false;

receiver.Start(async m =>
{
received = true;
await m.AcknowledgeAsync();
});

await receiver.MessageHandler.OnMessageReceivedAsync(receiver, new FakeReceiverMessage("Hello, world!"));

Assert.True(received);
}

[Test]
public async Task TheCallbackPassedToStart2IsInvokedWhenAMessageIsReceived()
{
var receiver = new FakeReceiver();

var received = false;

receiver.Start(m =>
{
received = true;
m.Acknowledge();
});

await receiver.MessageHandler.OnMessageReceivedAsync(receiver, new FakeReceiverMessage("Hello, world!"));

Assert.True(received);
}

[Test]
public async Task TheCallbackPassedToStart3IsInvokedWhenAMessageIsReceived()
{
var receiver = new FakeReceiver();

var received = false;

receiver.Start(async (r, m) =>
{
received = true;
await m.AcknowledgeAsync();
});

await receiver.MessageHandler.OnMessageReceivedAsync(receiver, new FakeReceiverMessage("Hello, world!"));

Assert.True(received);
}

[Test]
public async Task TheCallbackPassedToStart4IsInvokedWhenAMessageIsReceived()
{
var receiver = new FakeReceiver();

var received = false;

receiver.Start((r, m) =>
{
received = true;
m.Acknowledge();
});

await receiver.MessageHandler.OnMessageReceivedAsync(receiver, new FakeReceiverMessage("Hello, world!"));

Assert.True(received);
}
}
}
10 changes: 6 additions & 4 deletions RockLib.Messaging/ForwardingMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace RockLib.Messaging
using System.Threading.Tasks;

namespace RockLib.Messaging
{
/// <summary>
/// A decorator for the <see cref="IMessageHandler"/> interface that forwards
Expand Down Expand Up @@ -30,7 +32,7 @@ internal ForwardingMessageHandler(ForwardingReceiver forwardingReceiver, IMessag
/// <summary>
/// Handles a received message.
/// <para>
/// When invoked, this method invokes the <see cref="IMessageHandler.OnMessageReceived"/>
/// When invoked, this method invokes the <see cref="IMessageHandler.OnMessageReceivedAsync"/>
/// method of the <see cref="MessageHandler"/> property. It passes the
/// <see cref="ForwardingReceiver"/> property as the <c>receiver</c> argument and
/// a new <see cref="ForwardingReceiverMessage"/> decorator as the <c>message</c>
Expand All @@ -39,7 +41,7 @@ internal ForwardingMessageHandler(ForwardingReceiver forwardingReceiver, IMessag
/// </summary>
/// <param name="receiver">The instance of <see cref="IReceiver"/> that received the message.</param>
/// <param name="message">The message that was received.</param>
public void OnMessageReceived(IReceiver receiver, IReceiverMessage message) =>
MessageHandler.OnMessageReceived(ForwardingReceiver, new ForwardingReceiverMessage(ForwardingReceiver, message));
public Task OnMessageReceivedAsync(IReceiver receiver, IReceiverMessage message) =>
MessageHandler.OnMessageReceivedAsync(ForwardingReceiver, new ForwardingReceiverMessage(ForwardingReceiver, message));
}
}
2 changes: 1 addition & 1 deletion RockLib.Messaging/ForwardingReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public ForwardingReceiver(string name, IReceiver receiver,

/// <summary>
/// Gets or sets the message handler for this receiver. When set, the receiver is started
/// and will invoke the value's <see cref="IMessageHandler.OnMessageReceived"/> method
/// and will invoke the value's <see cref="IMessageHandler.OnMessageReceivedAsync"/> method
/// when messages are received.
/// <para>
/// When set, this property sets the <see cref="IReceiver.MessageHandler"/> of the
Expand Down
6 changes: 4 additions & 2 deletions RockLib.Messaging/IMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace RockLib.Messaging
using System.Threading.Tasks;

namespace RockLib.Messaging
{
/// <summary>
/// Defines an object that handles the messages received by an instance
Expand All @@ -11,6 +13,6 @@ public interface IMessageHandler
/// </summary>
/// <param name="receiver">The instance of <see cref="IReceiver"/> that received the message.</param>
/// <param name="message">The message that was received.</param>
void OnMessageReceived(IReceiver receiver, IReceiverMessage message);
Task OnMessageReceivedAsync(IReceiver receiver, IReceiverMessage message);
}
}
2 changes: 1 addition & 1 deletion RockLib.Messaging/IReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface IReceiver : IDisposable

/// <summary>
/// Gets or sets the message handler for this receiver. When set, the receiver is started
/// and will invoke the value's <see cref="IMessageHandler.OnMessageReceived"/> method
/// and will invoke the value's <see cref="IMessageHandler.OnMessageReceivedAsync"/> method
/// when messages are received.
/// </summary>
/// <remarks>
Expand Down
13 changes: 11 additions & 2 deletions RockLib.Messaging/OnMessageReceivedDelegate.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
namespace RockLib.Messaging
using System.Threading.Tasks;

namespace RockLib.Messaging
{
/// <summary>
/// Defines a function that handles a received message.
/// Defines a synchronous function that handles a received message.
/// </summary>
/// <param name="receiver">The instance of <see cref="IReceiver"/> that received the message.</param>
/// <param name="message">The message that was received.</param>
public delegate void OnMessageReceivedDelegate(IReceiver receiver, IReceiverMessage message);

/// <summary>
/// Defines an asynchronous function that handles a received message.
/// </summary>
/// <param name="receiver">The instance of <see cref="IReceiver"/> that received the message.</param>
/// <param name="message">The message that was received.</param>
public delegate Task OnMessageReceivedAsyncDelegate(IReceiver receiver, IReceiverMessage message);
}
2 changes: 1 addition & 1 deletion RockLib.Messaging/Receiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ protected Receiver(string name)

/// <summary>
/// Gets or sets the message handler for this receiver. When set, the receiver is started
/// and will invoke the value's <see cref="IMessageHandler.OnMessageReceived"/> method
/// and will invoke the value's <see cref="IMessageHandler.OnMessageReceivedAsync"/> method
/// when messages are received.
/// </summary>
public IMessageHandler MessageHandler
Expand Down
58 changes: 53 additions & 5 deletions RockLib.Messaging/ReceiverExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;

namespace RockLib.Messaging
{
Expand Down Expand Up @@ -26,7 +27,29 @@ public static void Start(this IReceiver receiver, Action<IReceiverMessage> onMes
public static void Start(this IReceiver receiver, OnMessageReceivedDelegate onMessageReceived)
{
if (onMessageReceived == null) throw new ArgumentNullException(nameof(onMessageReceived));
receiver.Start(new DelegateMessageHandler(onMessageReceived));
receiver.Start(new SyncDelegateMessageHandler(onMessageReceived));
}

/// <summary>
/// Start listening for messages and handle them using the specified callback function.
/// </summary>
/// <param name="receiver">The receiver to start.</param>
/// <param name="onMessageReceivedAsync">A function that is invoked when a message is received.</param>
public static void Start(this IReceiver receiver, Func<IReceiverMessage, Task> onMessageReceivedAsync)
{
if (onMessageReceivedAsync == null) throw new ArgumentNullException(nameof(onMessageReceivedAsync));
receiver.Start((_, message) => onMessageReceivedAsync(message));
}

/// <summary>
/// Start listening for messages and handle them using the specified callback function.
/// </summary>
/// <param name="receiver">The receiver to start.</param>
/// <param name="onMessageReceivedAsync">A function that is invoked when a message is received.</param>
public static void Start(this IReceiver receiver, OnMessageReceivedAsyncDelegate onMessageReceivedAsync)
{
if (onMessageReceivedAsync == null) throw new ArgumentNullException(nameof(onMessageReceivedAsync));
receiver.Start(new AsyncDelegateMessageHandler(onMessageReceivedAsync));
}

/// <summary>
Expand All @@ -44,15 +67,40 @@ public static void Start(this IReceiver receiver, IMessageHandler messageHandler
receiver.MessageHandler = messageHandler ?? throw new ArgumentNullException(nameof(messageHandler));
}

private class DelegateMessageHandler : IMessageHandler
private class SyncDelegateMessageHandler : IMessageHandler
{
private readonly OnMessageReceivedDelegate _onMessageReceived;

public DelegateMessageHandler(OnMessageReceivedDelegate onMessageReceived) =>
public SyncDelegateMessageHandler(OnMessageReceivedDelegate onMessageReceived) =>
_onMessageReceived = onMessageReceived;

public void OnMessageReceived(IReceiver receiver, IReceiverMessage message) =>
_onMessageReceived(receiver, message);
public Task OnMessageReceivedAsync(IReceiver receiver, IReceiverMessage message)
{
var source = new TaskCompletionSource<int>();

try
{
_onMessageReceived(receiver, message);
source.SetResult(0);
}
catch (Exception ex)
{
source.SetException(ex);
}

return source.Task;
}
}

private class AsyncDelegateMessageHandler : IMessageHandler
{
private readonly OnMessageReceivedAsyncDelegate _onMessageReceivedAsync;

public AsyncDelegateMessageHandler(OnMessageReceivedAsyncDelegate onMessageReceivedAsync) =>
_onMessageReceivedAsync = onMessageReceivedAsync;

public Task OnMessageReceivedAsync(IReceiver receiver, IReceiverMessage message) =>
_onMessageReceivedAsync(receiver, message);
}
}
}

0 comments on commit 749b24f

Please sign in to comment.