-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Akka.IO / Akka.Streams: ByteString
rewrite
#7136
base: dev
Are you sure you want to change the base?
Conversation
Tests are not currently compiling - working on that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a closer look at current state and wanted to add FTT based on some past hacking at the subject.
src/core/Akka.Streams/Dsl/Framing.cs
Outdated
_ => throw new NotSupportedException($"ByteOrder {_stage._byteOrder} is not supported") | ||
}; | ||
|
||
// TODO: AVOID ARRAY COPYING AGAIN HERE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this going to be separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to try to do that here but haven't gotten around to it yet
@@ -132,7 +101,8 @@ public static void SetBuffer(this SocketAsyncEventArgs args, IEnumerable<ByteStr | |||
else | |||
{ | |||
args.SetBuffer(null, 0, 0); | |||
args.BufferList = dataCollection.SelectMany(d => d.Buffers).ToList(); | |||
// TODO: fix this before we ship | |||
args.BufferList = new List<ArraySegment<byte>>(dataCollection.Select(bs => new ArraySegment<byte>(bs.ToArray()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thing that feels very relevant,
Short term, This -screams- pooling scenario and/or IMemoryOwner
even if we have to deal with semantics internally.
I remember hacking something together with one pool or another during my Covid lockdown 'Akka streams remote transport experiment' and it helped a LOT. I think it was somewhere during or just before the hand written protobuf stuff.
Where 'considerations' get important, is the pinning of the arrays underlying the bufferlist.
IDK the current pinning 'status' but last I dug into they all get pinned for a while, and that sucks for the GC.
Thought/suggestion would be in general to use a pooled buffer of some form, but also strongly to consider/measure a single final buffer per SetBuffer()
call being set on SAEA.
Having a pooled buffer increases likelyhood that it will, long term, survive to an older segment and hopefully avoid pinning costs in Gen0/Gen1. (Also, longer term, we can hopefully #IFDEF
said pooled buffers into POH and get more gains.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than go down that road, I'd prefer to rewrite Akka.IO to not depend on SocketAsyncEventArgs
and all of the complicated junk inside the TCP and UDP Akka.IO actors. I think some major simplification is in-order there for .NET 6 at least.
@@ -132,7 +101,8 @@ public static void SetBuffer(this SocketAsyncEventArgs args, IEnumerable<ByteStr | |||
else | |||
{ | |||
args.SetBuffer(null, 0, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Food for thought:
- If the 'buffer' is going to be typically null and we instead normally use
.BufferList
, it is better to hide this behind aif (args.Buffer != null)
since the underlying call has a try/catch.
src/core/Akka/IO/TcpConnection.cs
Outdated
@@ -945,6 +945,7 @@ public override void DoWrite(ConnectionInfo info) | |||
{ | |||
try | |||
{ | |||
// TODO: avoid use of SocketAsyncEventArgs on newer platforms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stupid question, are newer APIs to the point where SAEA isn't worth the squeeze? Again it's been a while but last I knew SAEA was still the 'best' way.
src/core/Akka/Util/ByteString.cs
Outdated
@@ -459,53 +382,38 @@ public int IndexOf(byte b, int from) | |||
/// <returns></returns> | |||
public bool HasSubstring(ByteString other, int index) | |||
{ | |||
// quick check: if subsequence is longer than remaining size, return false | |||
if (other.Count > _count - index) return false; | |||
if (other.Count == 0) return true; // Empty spans are always "found". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per discord comments, if we can use the 'Contains' extensions that exist in System.Memory
, namely public static int IndexOf<T>(this Span<T> span, ReadOnlySpan<T> value) where T : IEquatable<T>
, it would be strongly encouraged.
|
||
return new ByteString(array, count); | ||
// combine the two ReadOnlyMemory<byte> instances | ||
var array = new byte[_memory.Length + other._memory.Length]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
possible over-opt:
not sure whether compiler will wind up reading length twice here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I guess I could use the local variables but I think this is probably fine
|
||
return 0; | ||
count = Math.Min(count, Count); | ||
_memory.Span.CopyTo(buffer.AsSpan(index, count)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: IMO at bare minimum this could/should call into public int CopyTo(ref Memory<byte> buffer, int index, int count)
. But in general the CopyTo
overloads can be consolidated.
Additional NIT: Torn as to whether the smaller IL is a benefit compared to lacking bounds checks. (OTOH, this is 'I'd have to sharplab and benchmark to reliably comment.)
} | ||
|
||
/// <summary> | ||
/// Copies content of a current <see cref="ByteString"/> into a provided <see cref="Memory{T}"/> | ||
/// <paramref name="buffer"/> | ||
/// </summary> | ||
/// <returns>The number of bytes copied</returns> | ||
[Obsolete("This method will be removed in future versions of Akka.NET.")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
butwhy.jpg?
(also same question for other more optimal overloads compared to the byte[]
variant)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's duplicative - just use the methods on the ReadOnlyMemory<byte>
property directly that already do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially, I'm trying to lower TCO here. .NET Standard 2.0 will be going bye-bye soon enough. We can get by just using the native System.Memory methods instead.
var array = _memory.ToArray(); | ||
stream.Write(array, 0, array.Length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm..
need to sleep on whether chunking could be better.
Note: as mentioned in other ramblings, certain read/write operations on streams and the like with Memory
may have an intermediate copy to a magical CLR-internal buffer.
Let's see how accurately ChatGPT was able to describe the current FSM behavior of Akka.IO's stateDiagram-v2
[*] --> WaitingForRegistration: Connection established
WaitingForRegistration --> Connected: Register
WaitingForRegistration --> Closing: CloseCommand
WaitingForRegistration --> WaitingForRegistration: ResumeReading / SuspendReading
Connected --> ClosingWithPendingWrite: CloseCommand with pending write
Connected --> PeerSentEOF: PeerClosed (keep open)
Connected --> Closing: CloseCommand
Connected --> Connected: WriteCommand / SocketSent / SuspendReading / ResumeReading
PeerSentEOF --> Closing: CloseCommand
ClosingWithPendingWrite --> Closing: Write completed
Closing --> Closed: Connection closed
Closed --> [*]
|
Honestly, not a bad job ChatGPT ^ |
Looks like |
Gonna need to redo this - stuff like the framing code is phenomenally messy and difficult to reason about. That needs to be refactored and simplified first. |
This reverts commit d9c3d98.
Changes
A leftover milestone from v1.5 - we are dragging
ByteString
kicking and screaming into the .NET Core era with major refactoring, namely replacing all of its internalsReadOnlyMemory<byte>
usage.Checklist
For significant changes, please ensure that the following have been completed (delete if not relevant):
Latest
dev
BenchmarksInclude data from the relevant benchmark prior to this change here.
This PR's Benchmarks
Include data from after this change here.