Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Remote: log all layers of wrapped messages during errors #6818

Merged
merged 2 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/core/Akka.Remote.Tests/RemotingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Configuration;
using Akka.Event;
using Akka.Remote.Transport;
using Akka.Routing;
using Akka.TestKit;
Expand Down Expand Up @@ -651,6 +652,21 @@ await EventFilter.Exception<OversizedPayloadException>(start: "Discarding oversi
});
}

/// <summary>
/// Validate that we can accurately log wrapped messages that fail to be delivered
/// </summary>
[Fact]
public void Log_Wrapped_messages_that_fail_to_Send()
{
// 2x wrapped message
var wrappedMessage =
new DeadLetter(new ActorSelectionMessage("hit", Array.Empty<SelectionPathElement>(), false), TestActor,
TestActor);

var loggedType = EndpointWriter.LogPossiblyWrappedMessageType(wrappedMessage);
loggedType.Should().Contain("DeadLetter").And.Contain("ActorSelectionMessage").And.Contain("String");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simple unit test to assert that our log formatting helper captures all of the layers correctly.

This test produces the following output in the log:

Akka.Event.DeadLetter-->(Akka.Actor.ActorSelectionMessage-->System.String)

}

[Fact]
public async Task Drop_received_messages_over_payload_size()
{
Expand Down
10 changes: 7 additions & 3 deletions src/core/Akka.Remote.Tests/RemotingTerminatorSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ await EventFilter.Exception<ShutDownAssociation>().ExpectAsync(0,
(await associated.Ask<ActorIdentity>(new Identify("foo"), RemainingOrDefault)).MessageId.ShouldBe("foo");

// terminate the DEPLOYED system
Assert.True(await _sys2.Terminate().AwaitWithTimeout(10.Seconds()), "Expected to terminate within 10 seconds, but didn't.");
await ExpectTerminatedAsync(associated); // expect that the remote deployed actor is dead

await WithinAsync(TimeSpan.FromSeconds(10), async () =>
{
var terminationTask = _sys2.Terminate(); // start termination process
await ExpectTerminatedAsync(associated); // expect that the remote deployed actor is dead
Assert.True(await terminationTask.AwaitWithTimeout(RemainingOrDefault), "Expected to terminate within 10 seconds, but didn't.");
});

// now terminate the DEPLOYER system
Assert.True(await Sys.Terminate().AwaitWithTimeout(10.Seconds()), "Expected to terminate within 10 seconds, but didn't.");
});
Expand Down
42 changes: 39 additions & 3 deletions src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
Expand Down Expand Up @@ -1462,6 +1463,41 @@ private void BecomeWritingOrSendBufferedMessages()
}
}

/// <summary>
/// Unwraps <see cref="IWrappedMessage"/> in order to help make it easier to troubleshoot
/// which oversized message was sent.
/// </summary>
/// <returns>The formatted type string.</returns>
/// <remarks>
/// Internal for testing purposes only.
/// </remarks>
internal static string LogPossiblyWrappedMessageType(object failedMsg)
{
if (failedMsg is IWrappedMessage wrappedMessage)
{
static void LogWrapped(StringBuilder builder, IWrappedMessage nextMsg)
{
builder.Append($"{nextMsg.GetType()}-->");
if (nextMsg.Message is IWrappedMessage wrappedAgain)
{
builder.Append('(');
LogWrapped(builder, wrappedAgain); // recursively iterate through all layers of wrapping
builder.Append(')');
}
else
{
builder.Append(nextMsg.Message.GetType());
}
}

var builder = new StringBuilder();
LogWrapped(builder, wrappedMessage);
return builder.ToString();
}

return failedMsg.GetType().ToString();
}

private bool WriteSend(EndpointManager.Send send)
{
try
Expand All @@ -1486,7 +1522,7 @@ private bool WriteSend(EndpointManager.Send send)
string.Format("Discarding oversized payload sent to {0}: max allowed size {1} bytes, actual size of encoded {2} was {3} bytes.",
send.Recipient,
Transport.MaximumPayloadBytes,
send.Message.GetType(),
LogPossiblyWrappedMessageType(send.Message),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Captured all of the relevant EndpointWriter callsites here - unfortunately, on the EndpointReader side we don't know what the underlying type is because it fails during deserialization, which is where we would extract that kind of information.

pdu.Length));
_log.Error(reason, "Transient association error (association remains live)");
return true;
Expand All @@ -1509,15 +1545,15 @@ private bool WriteSend(EndpointManager.Send send)
_log.Error(
ex,
"Serialization failed for message [{0}]. Transient association error (association remains live)",
send.Message.GetType());
LogPossiblyWrappedMessageType(send.Message));
return true;
}
catch (ArgumentException ex)
{
_log.Error(
ex,
"Serializer threw ArgumentException for message type [{0}]. Transient association error (association remains live)",
send.Message.GetType());
LogPossiblyWrappedMessageType(send.Message));
return true;
}
catch (EndpointException ex)
Expand Down