From fe2b8b72f45067426a3cbfbb5d7618c91c507336 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Wed, 3 Jun 2020 11:52:39 -0400 Subject: [PATCH 1/3] Change Http2Connection.StartWriteAsync to use a callback model In all of the places we use StartWriteAsync, it's followed by some amount of synchronous work and then releasing the lock. We can instead pass that synchronous work into StartWriteAsync as a callback. This has a few benefits: 1. If/when StartWriteAsync completes asynchronously, in most of the call sites we don't incur another async method then completing asynchronously and allocating its state machine, or needing to call through another layer of state machines. 2. We can have spans as locals inside of the callbacks as they're not async methods, which lets us reduce the number of times we access Memory.Span. 3. We can more easily experiment with different execution models around invoking the work waiting for the lock. --- .../SocketsHttpHandler/Http2Connection.cs | 315 +++++++++--------- 1 file changed, 157 insertions(+), 158 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs index 9af644504a03e..ba0724e4d9c32 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs @@ -30,8 +30,6 @@ internal sealed partial class Http2Connection : HttpConnectionBase, IDisposable [ThreadStatic] private static string[]? t_headerValues; - private int _currentWriteSize; // as passed to StartWriteAsync - private readonly HPackDecoder _hpackDecoder; private readonly Dictionary _httpStreams; @@ -145,7 +143,7 @@ public async ValueTask SetupAsync() // Send initial connection-level WINDOW_UPDATE FrameHeader.WriteTo(_outgoingBuffer.AvailableSpan, FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, streamId: 0); _outgoingBuffer.Commit(FrameHeader.Size); - BinaryPrimitives.WriteUInt32BigEndian(_outgoingBuffer.AvailableSpan, (ConnectionWindowSize - DefaultInitialWindowSize)); + BinaryPrimitives.WriteUInt32BigEndian(_outgoingBuffer.AvailableSpan, ConnectionWindowSize - DefaultInitialWindowSize); _outgoingBuffer.Commit(4); await _stream.WriteAsync(_outgoingBuffer.ActiveMemory).ConfigureAwait(false); @@ -584,7 +582,7 @@ private void ChangeMaxConcurrentStreams(uint newValue) // The value is provided as a uint. // Limit this to int.MaxValue since the CreditManager implementation only supports singed values. // In practice, we should never reach this value. - int effectiveValue = (newValue > (uint)int.MaxValue ? int.MaxValue : (int)newValue); + int effectiveValue = newValue > (uint)int.MaxValue ? int.MaxValue : (int)newValue; int delta = effectiveValue - _maxConcurrentStreams; _maxConcurrentStreams = effectiveValue; @@ -754,13 +752,16 @@ private void ProcessGoAwayFrame(FrameHeader frameHeader) _incomingBuffer.Discard(frameHeader.PayloadLength); } - internal async Task FlushAsync(CancellationToken cancellationToken = default) - { - await StartWriteAsync(0, cancellationToken).ConfigureAwait(false); - FinishWrite(FlushTiming.Now); - } + internal Task FlushAsync(CancellationToken cancellationToken) => + InvokeLockedAsync(0, this, (thisRef, writeBuffer) => thisRef.FinishWrite(writeBuffer.Length, FlushTiming.Now), null, cancellationToken); - private async ValueTask> StartWriteAsync(int writeBytes, CancellationToken cancellationToken = default) + /// Performs a write operation serialized via the . + /// The number of bytes to be written. + /// The state to pass through to the callbacks. + /// The action to be invoked while the writer lock is held and that actually writes the data to the provided buffer. + /// The action to invoke if waiting for the write lock is canceled. + /// The cancellation token to use while waiting. + private async Task InvokeLockedAsync(int writeBytes, T state, Action> lockedAction, Action? failedAcquireAction = null, CancellationToken cancellationToken = default) { if (NetEventSource.IsEnabled) Trace($"{nameof(writeBytes)}={writeBytes}"); @@ -793,6 +794,7 @@ private async ValueTask> StartWriteAsync(int writeBytes, Cancellati LogExceptions(FlushAsync(cancellationToken: default)); } + failedAcquireAction?.Invoke(state); throw; } Interlocked.Decrement(ref _pendingWriters); @@ -802,10 +804,11 @@ private async ValueTask> StartWriteAsync(int writeBytes, Cancellati if (_abortException != null) { _writerLock.Exit(); + failedAcquireAction?.Invoke(state); ThrowRequestAborted(_abortException); } - // Flush anything necessary, and return back the write buffer to use. + // Flush waiting state, then invoke the supplied action. try { // If there is a pending write that was canceled while in progress, wait for it to complete. @@ -815,58 +818,46 @@ private async ValueTask> StartWriteAsync(int writeBytes, Cancellati _inProgressWrite = null; } - int totalBufferLength = _outgoingBuffer.Capacity; - int activeBufferLength = _outgoingBuffer.ActiveLength; - // If the buffer has already grown to 32k, does not have room for the next request, // and is non-empty, flush the current contents to the wire. - if (totalBufferLength >= UnflushedOutgoingBufferSize && - writeBytes >= totalBufferLength - activeBufferLength && - activeBufferLength > 0) + int totalBufferLength = _outgoingBuffer.Capacity; + if (totalBufferLength >= UnflushedOutgoingBufferSize) { - // We explicitly do not pass cancellationToken here, as this flush impacts more than just this operation. - await new ValueTask(FlushOutgoingBytesAsync()).ConfigureAwait(false); // await ValueTask to minimize number of awaiter fields + int activeBufferLength = _outgoingBuffer.ActiveLength; + if (writeBytes >= totalBufferLength - activeBufferLength && activeBufferLength > 0) + { + // We explicitly do not pass cancellationToken here, as this flush impacts more than just this operation. + await new ValueTask(FlushOutgoingBytesAsync()).ConfigureAwait(false); // await ValueTask to minimize number of awaiter fields + } } + // Invoke the callback with the supplied state and the target write buffer. _outgoingBuffer.EnsureAvailableSpace(writeBytes); - Memory writeBuffer = _outgoingBuffer.AvailableMemory.Slice(0, writeBytes); - _currentWriteSize = writeBytes; - - return writeBuffer; + lockedAction(state, _outgoingBuffer.AvailableMemory.Slice(0, writeBytes)); } - catch + finally { _writerLock.Exit(); - throw; } } /// Flushes buffered bytes to the wire. + /// The number of bytes written that are being flushed. /// When a flush should be performed for this write. /// /// Writes here need to be atomic, so as to avoid killing the whole connection. - /// Callers must hold the write lock, which this will release. + /// Callers must hold the write lock. /// - private void FinishWrite(FlushTiming flush) + private void FinishWrite(int bytesWritten, FlushTiming flush) { - if (NetEventSource.IsEnabled) Trace($"{nameof(flush)}={flush}"); - // We can't validate that we hold the mutex, but we can at least validate that someone is holding it. Debug.Assert(_writerLock.IsHeld); - _outgoingBuffer.Commit(_currentWriteSize); - _lastPendingWriterShouldFlush |= (flush == FlushTiming.AfterPendingWrites); - EndWrite(forceFlush: (flush == FlushTiming.Now)); - } - - private void CancelWrite() - { - if (NetEventSource.IsEnabled) Trace(""); - - // We can't validate that we hold the mutex, but we can at least validate that someone is holding it. - Debug.Assert(_writerLock.IsHeld); + if (NetEventSource.IsEnabled) Trace($"{nameof(flush)}={flush}"); - EndWrite(forceFlush: false); + _outgoingBuffer.Commit(bytesWritten); + _lastPendingWriterShouldFlush |= flush == FlushTiming.AfterPendingWrites; + EndWrite(forceFlush: flush == FlushTiming.Now); } private void EndWrite(bool forceFlush) @@ -874,58 +865,58 @@ private void EndWrite(bool forceFlush) // We can't validate that we hold the mutex, but we can at least validate that someone is holding it. Debug.Assert(_writerLock.IsHeld); - try + // We must flush if the caller requires it or if this or a recent frame wanted to be flushed + // once there were no more pending writers that themselves could have forced the flush. + if (forceFlush || (_pendingWriters == 0 && _lastPendingWriterShouldFlush)) { - // We must flush if the caller requires it or if this or a recent frame wanted to be flushed - // once there were no more pending writers that themselves could have forced the flush. - if (forceFlush || (_pendingWriters == 0 && _lastPendingWriterShouldFlush)) + Debug.Assert(_inProgressWrite == null); + if (_outgoingBuffer.ActiveLength > 0) { - Debug.Assert(_inProgressWrite == null); - if (_outgoingBuffer.ActiveLength > 0) - { - _inProgressWrite = FlushOutgoingBytesAsync(); - } + _inProgressWrite = FlushOutgoingBytesAsync(); } } - finally - { - _writerLock.Exit(); - } } - private async Task SendSettingsAckAsync() - { - Memory writeBuffer = await StartWriteAsync(FrameHeader.Size).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace("Started writing."); + private Task SendSettingsAckAsync() => + InvokeLockedAsync(FrameHeader.Size, this, (thisRef, writeBuffer) => + { + if (NetEventSource.IsEnabled) thisRef.Trace("Started writing."); - FrameHeader.WriteTo(writeBuffer.Span, 0, FrameType.Settings, FrameFlags.Ack, streamId: 0); + Span span = writeBuffer.Span; - FinishWrite(FlushTiming.AfterPendingWrites); - } + FrameHeader.WriteTo(span, 0, FrameType.Settings, FrameFlags.Ack, streamId: 0); + + FinishWrite(span.Length, FlushTiming.AfterPendingWrites); + }); /// The 8-byte ping content to send, read as a big-endian integer. - private async Task SendPingAckAsync(long pingContent) - { - Memory writeBuffer = await StartWriteAsync(FrameHeader.Size + FrameHeader.PingLength).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace("Started writing."); + private Task SendPingAckAsync(long pingContent) => + InvokeLockedAsync(FrameHeader.Size + FrameHeader.PingLength, (thisRef: this, pingContent), (state, writeBuffer) => + { + if (NetEventSource.IsEnabled) state.thisRef.Trace("Started writing."); - Debug.Assert(sizeof(long) == FrameHeader.PingLength); - FrameHeader.WriteTo(writeBuffer.Span, FrameHeader.PingLength, FrameType.Ping, FrameFlags.Ack, streamId: 0); - BinaryPrimitives.WriteInt64BigEndian(writeBuffer.Span.Slice(FrameHeader.Size), pingContent); + Debug.Assert(sizeof(long) == FrameHeader.PingLength); - FinishWrite(FlushTiming.AfterPendingWrites); - } + Span span = writeBuffer.Span; - private async Task SendRstStreamAsync(int streamId, Http2ProtocolErrorCode errorCode) - { - Memory writeBuffer = await StartWriteAsync(FrameHeader.Size + FrameHeader.RstStreamLength).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace(streamId, $"Started writing. {nameof(errorCode)}={errorCode}"); + FrameHeader.WriteTo(span, FrameHeader.PingLength, FrameType.Ping, FrameFlags.Ack, streamId: 0); + BinaryPrimitives.WriteInt64BigEndian(span.Slice(FrameHeader.Size), state.pingContent); - FrameHeader.WriteTo(writeBuffer.Span, FrameHeader.RstStreamLength, FrameType.RstStream, FrameFlags.None, streamId); - BinaryPrimitives.WriteInt32BigEndian(writeBuffer.Span.Slice(FrameHeader.Size), (int)errorCode); + state.thisRef.FinishWrite(span.Length, FlushTiming.AfterPendingWrites); + }); - FinishWrite(FlushTiming.Now); // ensure cancellation is seen as soon as possible - } + private Task SendRstStreamAsync(int streamId, Http2ProtocolErrorCode errorCode) => + InvokeLockedAsync(FrameHeader.Size + FrameHeader.RstStreamLength, (thisRef: this, streamId, errorCode), (s, writeBuffer) => + { + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(s.errorCode)}={s.errorCode}"); + + Span span = writeBuffer.Span; + + FrameHeader.WriteTo(span, FrameHeader.RstStreamLength, FrameType.RstStream, FrameFlags.None, s.streamId); + BinaryPrimitives.WriteInt32BigEndian(span.Slice(FrameHeader.Size), (int)s.errorCode); + + s.thisRef.FinishWrite(span.Length, FlushTiming.Now); // ensure cancellation is seen as soon as possible + }); private static (ReadOnlyMemory first, ReadOnlyMemory rest) SplitBuffer(ReadOnlyMemory buffer, int maxSize) => buffer.Length > maxSize ? @@ -1245,63 +1236,67 @@ private async ValueTask SendHeadersAsync(HttpRequestMessage request // Start the write. This serializes access to write to the connection, and ensures that HEADERS // and CONTINUATION frames stay together, as they must do. We use the lock as well to ensure new // streams are created and started in order. - Memory writeBuffer = await StartWriteAsync(totalSize, cancellationToken).ConfigureAwait(false); - try + await InvokeLockedAsync(totalSize, (thisRef: this, http2Stream, current, remaining, totalSize, flags, mustFlush), (s, writeBuffer) => { - // Allocate the next available stream ID. Note that if we fail before sending the headers, - // we'll just skip this stream ID, which is fine. - lock (SyncObject) + try { - if (_nextStream == MaxStreamId || _disposed || _lastStreamId != -1) + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.http2Stream.StreamId, $"Started writing. {nameof(s.totalSize)}={s.totalSize}"); + + // Allocate the next available stream ID. Note that if we fail before sending the headers, + // we'll just skip this stream ID, which is fine. + lock (s.thisRef.SyncObject) { - // We ran out of stream IDs or we raced between acquiring the connection from the pool and shutting down. - // Throw a retryable request exception. This will cause retry logic to kick in - // and perform another connection attempt. The user should never see this exception. - ThrowShutdownException(); + if (s.thisRef._nextStream == MaxStreamId || s.thisRef._disposed || s.thisRef._lastStreamId != -1) + { + // We ran out of stream IDs or we raced between acquiring the connection from the pool and shutting down. + // Throw a retryable request exception. This will cause retry logic to kick in + // and perform another connection attempt. The user should never see this exception. + s.thisRef.ThrowShutdownException(); + } + + // Client-initiated streams are always odd-numbered, so increase by 2. + s.http2Stream.StreamId = s.thisRef._nextStream; + s.thisRef._nextStream += 2; + + // We're about to flush the HEADERS frame, so add the stream to the dictionary now. + // The lifetime of the stream is now controlled by the stream itself and the connection. + // This can fail if the connection is shutting down, in which case we will cancel sending this frame. + s.thisRef._httpStreams.Add(s.http2Stream.StreamId, s.http2Stream); } - // Client-initiated streams are always odd-numbered, so increase by 2. - http2Stream.StreamId = _nextStream; - _nextStream += 2; + Span span = writeBuffer.Span; - // We're about to flush the HEADERS frame, so add the stream to the dictionary now. - // The lifetime of the stream is now controlled by the stream itself and the connection. - // This can fail if the connection is shutting down, in which case we will cancel sending this frame. - _httpStreams.Add(http2Stream.StreamId, http2Stream); - } + // Copy the HEADERS frame. + FrameHeader.WriteTo(span, s.current.Length, FrameType.Headers, s.flags, s.http2Stream.StreamId); + span = span.Slice(FrameHeader.Size); + s.current.Span.CopyTo(span); + span = span.Slice(s.current.Length); + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.http2Stream.StreamId, $"Wrote HEADERS frame. Length={s.current.Length}, flags={s.flags}"); - if (NetEventSource.IsEnabled) Trace(http2Stream.StreamId, $"Started writing. {nameof(totalSize)}={totalSize}"); + // Copy CONTINUATION frames, if any. + while (s.remaining.Length > 0) + { + (s.current, s.remaining) = SplitBuffer(s.remaining, FrameHeader.MaxPayloadLength); + s.flags = s.remaining.Length == 0 ? FrameFlags.EndHeaders : FrameFlags.None; + + FrameHeader.WriteTo(span, s.current.Length, FrameType.Continuation, s.flags, s.http2Stream.StreamId); + span = span.Slice(FrameHeader.Size); + s.current.Span.CopyTo(span); + span = span.Slice(s.current.Length); + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.http2Stream.StreamId, $"Wrote CONTINUATION frame. Length={s.current.Length}, flags={s.flags}"); + } - // Copy the HEADERS frame. - FrameHeader.WriteTo(writeBuffer.Span, current.Length, FrameType.Headers, flags, http2Stream.StreamId); - writeBuffer = writeBuffer.Slice(FrameHeader.Size); - current.CopyTo(writeBuffer); - writeBuffer = writeBuffer.Slice(current.Length); - if (NetEventSource.IsEnabled) Trace(http2Stream.StreamId, $"Wrote HEADERS frame. Length={current.Length}, flags={flags}"); + Debug.Assert(span.Length == 0); - // Copy CONTINUATION frames, if any. - while (remaining.Length > 0) + s.thisRef.FinishWrite(writeBuffer.Length, s.mustFlush || (s.flags & FrameFlags.EndStream) != 0 ? FlushTiming.AfterPendingWrites : FlushTiming.Eventually); + } + catch { - (current, remaining) = SplitBuffer(remaining, FrameHeader.MaxPayloadLength); - flags = remaining.Length == 0 ? FrameFlags.EndHeaders : FrameFlags.None; - - FrameHeader.WriteTo(writeBuffer.Span, current.Length, FrameType.Continuation, flags, http2Stream.StreamId); - writeBuffer = writeBuffer.Slice(FrameHeader.Size); - current.CopyTo(writeBuffer); - writeBuffer = writeBuffer.Slice(current.Length); - if (NetEventSource.IsEnabled) Trace(http2Stream.StreamId, $"Wrote CONTINUATION frame. Length={current.Length}, flags={flags}"); + s.thisRef.EndWrite(forceFlush: false); + throw; } - - Debug.Assert(writeBuffer.Length == 0); - - FinishWrite(mustFlush || (flags & FrameFlags.EndStream) != 0 ? FlushTiming.AfterPendingWrites : FlushTiming.Eventually); - return http2Stream; - } - catch - { - CancelWrite(); - throw; - } + }).ConfigureAwait(false); + return http2Stream; } catch { @@ -1320,57 +1315,62 @@ private async Task SendStreamDataAsync(int streamId, ReadOnlyMemory buffer while (remaining.Length > 0) { - int frameSize = Math.Min(remaining.Length, FrameHeader.MaxPayloadLength); - // Once credit had been granted, we want to actually consume those bytes. + int frameSize = Math.Min(remaining.Length, FrameHeader.MaxPayloadLength); frameSize = await _connectionWindow.RequestCreditAsync(frameSize, cancellationToken).ConfigureAwait(false); ReadOnlyMemory current; (current, remaining) = SplitBuffer(remaining, frameSize); - // It's possible that a cancellation will occur while we wait for the write lock. In that case, we need to - // return the credit that we have acquired and don't plan to use. - Memory writeBuffer; - try - { - writeBuffer = await StartWriteAsync(FrameHeader.Size + current.Length, cancellationToken).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace(streamId, $"Started writing. {nameof(writeBuffer.Length)}={writeBuffer.Length}"); - } - catch - { - _connectionWindow.AdjustCredit(frameSize); - throw; - } + await InvokeLockedAsync( + FrameHeader.Size + current.Length, + (thisRef: this, streamId, frameSize, current), + (s, writeBuffer) => + { + // Invoked while holding the lock: + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(writeBuffer.Length)}={writeBuffer.Length}"); - FrameHeader.WriteTo(writeBuffer.Span, current.Length, FrameType.Data, FrameFlags.None, streamId); - current.CopyTo(writeBuffer.Slice(FrameHeader.Size)); + FrameHeader.WriteTo(writeBuffer.Span, s.current.Length, FrameType.Data, FrameFlags.None, s.streamId); + s.current.CopyTo(writeBuffer.Slice(FrameHeader.Size)); - FinishWrite(FlushTiming.Eventually); // no need to flush, as the request content may do so explicitly, or worst case we'll do so as part of the end data frame + s.thisRef.FinishWrite(writeBuffer.Length, FlushTiming.Eventually); // no need to flush, as the request content may do so explicitly, or worst case we'll do so as part of the end data frame + }, + s => + { + // Invoked if waiting for the lock is canceled (in that case, we need to return the credit that we have acquired and don't plan to use): + s.thisRef._connectionWindow.AdjustCredit(s.frameSize); + }, + cancellationToken).ConfigureAwait(false); } } - private async Task SendEndStreamAsync(int streamId) - { - Memory writeBuffer = await StartWriteAsync(FrameHeader.Size).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace(streamId, "Started writing."); + private Task SendEndStreamAsync(int streamId) => + InvokeLockedAsync(FrameHeader.Size, (thisRef: this, streamId), (s, writeBuffer) => + { + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, "Started writing."); - FrameHeader.WriteTo(writeBuffer.Span, 0, FrameType.Data, FrameFlags.EndStream, streamId); + Span span = writeBuffer.Span; - FinishWrite(FlushTiming.AfterPendingWrites); // finished sending request body, so flush soon (but ok to wait for pending packets) - } + FrameHeader.WriteTo(span, 0, FrameType.Data, FrameFlags.EndStream, s.streamId); - private async Task SendWindowUpdateAsync(int streamId, int amount) + s.thisRef.FinishWrite(span.Length, FlushTiming.AfterPendingWrites); // finished sending request body, so flush soon (but ok to wait for pending packets) + }); + + private Task SendWindowUpdateAsync(int streamId, int amount) { + // We update both the connection-level and stream-level windows at the same time Debug.Assert(amount > 0); + return InvokeLockedAsync(FrameHeader.Size + FrameHeader.WindowUpdateLength, (thisRef: this, streamId, amount), (s, writeBuffer) => + { + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(s.amount)}={s.amount}"); - // We update both the connection-level and stream-level windows at the same time - Memory writeBuffer = await StartWriteAsync(FrameHeader.Size + FrameHeader.WindowUpdateLength).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace(streamId, $"Started writing. {nameof(amount)}={amount}"); + Span span = writeBuffer.Span; - FrameHeader.WriteTo(writeBuffer.Span, FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, streamId); - BinaryPrimitives.WriteInt32BigEndian(writeBuffer.Span.Slice(FrameHeader.Size), amount); + FrameHeader.WriteTo(span, FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, s.streamId); + BinaryPrimitives.WriteInt32BigEndian(span.Slice(FrameHeader.Size), s.amount); - FinishWrite(FlushTiming.Now); // make sure window updates are seen as soon as possible + s.thisRef.FinishWrite(span.Length, FlushTiming.Now); // make sure window updates are seen as soon as possible + }); } private void ExtendWindow(int amount) @@ -1428,7 +1428,6 @@ private void Abort(Exception abortException) public bool IsExpired(long nowTicks, TimeSpan connectionLifetime, TimeSpan connectionIdleTimeout) - { if (_disposed) { @@ -1440,7 +1439,7 @@ public bool IsExpired(long nowTicks, (_httpStreams.Count == 0) && ((nowTicks - _idleSinceTickCount) > connectionIdleTimeout.TotalMilliseconds)) { - if (NetEventSource.IsEnabled) Trace($"Connection no longer usable. Idle {TimeSpan.FromMilliseconds((nowTicks - _idleSinceTickCount))} > {connectionIdleTimeout}."); + if (NetEventSource.IsEnabled) Trace($"Connection no longer usable. Idle {TimeSpan.FromMilliseconds(nowTicks - _idleSinceTickCount)} > {connectionIdleTimeout}."); return true; } From 4395ed6827e1d1adb20e03250cbd85cfc87af718 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Wed, 3 Jun 2020 11:57:36 -0400 Subject: [PATCH 2/3] Tweak Active/AvailableMemory properties/methods --- src/libraries/Common/src/System/Net/ArrayBuffer.cs | 10 ++++++---- .../Net/Http/SocketsHttpHandler/Http2Connection.cs | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/libraries/Common/src/System/Net/ArrayBuffer.cs b/src/libraries/Common/src/System/Net/ArrayBuffer.cs index fc0bbeb099bfd..449060c8a2425 100644 --- a/src/libraries/Common/src/System/Net/ArrayBuffer.cs +++ b/src/libraries/Common/src/System/Net/ArrayBuffer.cs @@ -57,11 +57,13 @@ public void Dispose() public int ActiveLength => _availableStart - _activeStart; public Span ActiveSpan => new Span(_bytes, _activeStart, _availableStart - _activeStart); - public ReadOnlySpan ActiveReadOnlySpan => new ReadOnlySpan(_bytes, _activeStart, _availableStart - _activeStart); - public int AvailableLength => _bytes.Length - _availableStart; - public Span AvailableSpan => new Span(_bytes, _availableStart, AvailableLength); public Memory ActiveMemory => new Memory(_bytes, _activeStart, _availableStart - _activeStart); - public Memory AvailableMemory => new Memory(_bytes, _availableStart, _bytes.Length - _availableStart); + + public int AvailableLength => _bytes.Length - _availableStart; + public Span AvailableSpan => _bytes.AsSpan(_availableStart); + public Memory AvailableMemory => _bytes.AsMemory(_availableStart); + public Memory AvailableMemorySliced(int length) => new Memory(_bytes, _availableStart, length); + public int Capacity => _bytes.Length; diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs index ba0724e4d9c32..a72f8c8023e45 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs @@ -833,7 +833,7 @@ private async Task InvokeLockedAsync(int writeBytes, T state, Action Date: Wed, 3 Jun 2020 16:25:11 -0400 Subject: [PATCH 3/3] Address PR feedback --- .../Common/src/System/Net/ArrayBuffer.cs | 2 +- .../SocketsHttpHandler/Http2Connection.cs | 96 +++++++------------ 2 files changed, 36 insertions(+), 62 deletions(-) diff --git a/src/libraries/Common/src/System/Net/ArrayBuffer.cs b/src/libraries/Common/src/System/Net/ArrayBuffer.cs index 449060c8a2425..6c536d03267a0 100644 --- a/src/libraries/Common/src/System/Net/ArrayBuffer.cs +++ b/src/libraries/Common/src/System/Net/ArrayBuffer.cs @@ -57,6 +57,7 @@ public void Dispose() public int ActiveLength => _availableStart - _activeStart; public Span ActiveSpan => new Span(_bytes, _activeStart, _availableStart - _activeStart); + public ReadOnlySpan ActiveReadOnlySpan => new ReadOnlySpan(_bytes, _activeStart, _availableStart - _activeStart); public Memory ActiveMemory => new Memory(_bytes, _activeStart, _availableStart - _activeStart); public int AvailableLength => _bytes.Length - _availableStart; @@ -64,7 +65,6 @@ public void Dispose() public Memory AvailableMemory => _bytes.AsMemory(_availableStart); public Memory AvailableMemorySliced(int length) => new Memory(_bytes, _availableStart, length); - public int Capacity => _bytes.Length; public void Discard(int byteCount) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs index a72f8c8023e45..846ed88bc39e6 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs @@ -753,15 +753,14 @@ private void ProcessGoAwayFrame(FrameHeader frameHeader) } internal Task FlushAsync(CancellationToken cancellationToken) => - InvokeLockedAsync(0, this, (thisRef, writeBuffer) => thisRef.FinishWrite(writeBuffer.Length, FlushTiming.Now), null, cancellationToken); + PerformWriteAsync(0, 0, (_, __) => FlushTiming.Now, cancellationToken); /// Performs a write operation serialized via the . /// The number of bytes to be written. /// The state to pass through to the callbacks. /// The action to be invoked while the writer lock is held and that actually writes the data to the provided buffer. - /// The action to invoke if waiting for the write lock is canceled. /// The cancellation token to use while waiting. - private async Task InvokeLockedAsync(int writeBytes, T state, Action> lockedAction, Action? failedAcquireAction = null, CancellationToken cancellationToken = default) + private async Task PerformWriteAsync(int writeBytes, T state, Func, FlushTiming> lockedAction, CancellationToken cancellationToken = default) { if (NetEventSource.IsEnabled) Trace($"{nameof(writeBytes)}={writeBytes}"); @@ -794,7 +793,6 @@ private async Task InvokeLockedAsync(int writeBytes, T state, Action(int writeBytes, T state, Action(int writeBytes, T state, Action(int writeBytes, T state, ActionFlushes buffered bytes to the wire. - /// The number of bytes written that are being flushed. - /// When a flush should be performed for this write. - /// - /// Writes here need to be atomic, so as to avoid killing the whole connection. - /// Callers must hold the write lock. - /// - private void FinishWrite(int bytesWritten, FlushTiming flush) - { - // We can't validate that we hold the mutex, but we can at least validate that someone is holding it. - Debug.Assert(_writerLock.IsHeld); - - if (NetEventSource.IsEnabled) Trace($"{nameof(flush)}={flush}"); - - _outgoingBuffer.Commit(bytesWritten); - _lastPendingWriterShouldFlush |= flush == FlushTiming.AfterPendingWrites; - EndWrite(forceFlush: flush == FlushTiming.Now); - } - private void EndWrite(bool forceFlush) { // We can't validate that we hold the mutex, but we can at least validate that someone is holding it. @@ -878,44 +861,40 @@ private void EndWrite(bool forceFlush) } private Task SendSettingsAckAsync() => - InvokeLockedAsync(FrameHeader.Size, this, (thisRef, writeBuffer) => + PerformWriteAsync(FrameHeader.Size, this, (thisRef, writeBuffer) => { if (NetEventSource.IsEnabled) thisRef.Trace("Started writing."); - Span span = writeBuffer.Span; + FrameHeader.WriteTo(writeBuffer.Span, 0, FrameType.Settings, FrameFlags.Ack, streamId: 0); - FrameHeader.WriteTo(span, 0, FrameType.Settings, FrameFlags.Ack, streamId: 0); - - FinishWrite(span.Length, FlushTiming.AfterPendingWrites); + return FlushTiming.AfterPendingWrites; }); /// The 8-byte ping content to send, read as a big-endian integer. private Task SendPingAckAsync(long pingContent) => - InvokeLockedAsync(FrameHeader.Size + FrameHeader.PingLength, (thisRef: this, pingContent), (state, writeBuffer) => + PerformWriteAsync(FrameHeader.Size + FrameHeader.PingLength, (thisRef: this, pingContent), (state, writeBuffer) => { if (NetEventSource.IsEnabled) state.thisRef.Trace("Started writing."); Debug.Assert(sizeof(long) == FrameHeader.PingLength); Span span = writeBuffer.Span; - FrameHeader.WriteTo(span, FrameHeader.PingLength, FrameType.Ping, FrameFlags.Ack, streamId: 0); BinaryPrimitives.WriteInt64BigEndian(span.Slice(FrameHeader.Size), state.pingContent); - state.thisRef.FinishWrite(span.Length, FlushTiming.AfterPendingWrites); + return FlushTiming.AfterPendingWrites; }); private Task SendRstStreamAsync(int streamId, Http2ProtocolErrorCode errorCode) => - InvokeLockedAsync(FrameHeader.Size + FrameHeader.RstStreamLength, (thisRef: this, streamId, errorCode), (s, writeBuffer) => + PerformWriteAsync(FrameHeader.Size + FrameHeader.RstStreamLength, (thisRef: this, streamId, errorCode), (s, writeBuffer) => { if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(s.errorCode)}={s.errorCode}"); Span span = writeBuffer.Span; - FrameHeader.WriteTo(span, FrameHeader.RstStreamLength, FrameType.RstStream, FrameFlags.None, s.streamId); BinaryPrimitives.WriteInt32BigEndian(span.Slice(FrameHeader.Size), (int)s.errorCode); - s.thisRef.FinishWrite(span.Length, FlushTiming.Now); // ensure cancellation is seen as soon as possible + return FlushTiming.Now; // ensure cancellation is seen as soon as possible }); private static (ReadOnlyMemory first, ReadOnlyMemory rest) SplitBuffer(ReadOnlyMemory buffer, int maxSize) => @@ -1236,7 +1215,7 @@ private async ValueTask SendHeadersAsync(HttpRequestMessage request // Start the write. This serializes access to write to the connection, and ensures that HEADERS // and CONTINUATION frames stay together, as they must do. We use the lock as well to ensure new // streams are created and started in order. - await InvokeLockedAsync(totalSize, (thisRef: this, http2Stream, current, remaining, totalSize, flags, mustFlush), (s, writeBuffer) => + await PerformWriteAsync(totalSize, (thisRef: this, http2Stream, current, remaining, totalSize, flags, mustFlush), (s, writeBuffer) => { try { @@ -1288,14 +1267,14 @@ await InvokeLockedAsync(totalSize, (thisRef: this, http2Stream, current, remaini Debug.Assert(span.Length == 0); - s.thisRef.FinishWrite(writeBuffer.Length, s.mustFlush || (s.flags & FrameFlags.EndStream) != 0 ? FlushTiming.AfterPendingWrites : FlushTiming.Eventually); + return s.mustFlush || (s.flags & FrameFlags.EndStream) != 0 ? FlushTiming.AfterPendingWrites : FlushTiming.Eventually; } catch { s.thisRef.EndWrite(forceFlush: false); throw; } - }).ConfigureAwait(false); + }, cancellationToken).ConfigureAwait(false); return http2Stream; } catch @@ -1321,11 +1300,9 @@ private async Task SendStreamDataAsync(int streamId, ReadOnlyMemory buffer ReadOnlyMemory current; (current, remaining) = SplitBuffer(remaining, frameSize); - - await InvokeLockedAsync( - FrameHeader.Size + current.Length, - (thisRef: this, streamId, frameSize, current), - (s, writeBuffer) => + try + { + await PerformWriteAsync(FrameHeader.Size + current.Length, (thisRef: this, streamId, current), (s, writeBuffer) => { // Invoked while holding the lock: if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(writeBuffer.Length)}={writeBuffer.Length}"); @@ -1333,43 +1310,41 @@ await InvokeLockedAsync( FrameHeader.WriteTo(writeBuffer.Span, s.current.Length, FrameType.Data, FrameFlags.None, s.streamId); s.current.CopyTo(writeBuffer.Slice(FrameHeader.Size)); - s.thisRef.FinishWrite(writeBuffer.Length, FlushTiming.Eventually); // no need to flush, as the request content may do so explicitly, or worst case we'll do so as part of the end data frame - }, - s => - { - // Invoked if waiting for the lock is canceled (in that case, we need to return the credit that we have acquired and don't plan to use): - s.thisRef._connectionWindow.AdjustCredit(s.frameSize); - }, - cancellationToken).ConfigureAwait(false); + return FlushTiming.Eventually; // no need to flush, as the request content may do so explicitly, or worst case we'll do so as part of the end data frame + }, cancellationToken).ConfigureAwait(false); + } + catch + { + // Invoked if waiting for the lock is canceled (in that case, we need to return the credit that we have acquired and don't plan to use): + _connectionWindow.AdjustCredit(frameSize); + throw; + } } } private Task SendEndStreamAsync(int streamId) => - InvokeLockedAsync(FrameHeader.Size, (thisRef: this, streamId), (s, writeBuffer) => + PerformWriteAsync(FrameHeader.Size, (thisRef: this, streamId), (s, writeBuffer) => { if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, "Started writing."); - Span span = writeBuffer.Span; - - FrameHeader.WriteTo(span, 0, FrameType.Data, FrameFlags.EndStream, s.streamId); + FrameHeader.WriteTo(writeBuffer.Span, 0, FrameType.Data, FrameFlags.EndStream, s.streamId); - s.thisRef.FinishWrite(span.Length, FlushTiming.AfterPendingWrites); // finished sending request body, so flush soon (but ok to wait for pending packets) + return FlushTiming.AfterPendingWrites; // finished sending request body, so flush soon (but ok to wait for pending packets) }); private Task SendWindowUpdateAsync(int streamId, int amount) { // We update both the connection-level and stream-level windows at the same time Debug.Assert(amount > 0); - return InvokeLockedAsync(FrameHeader.Size + FrameHeader.WindowUpdateLength, (thisRef: this, streamId, amount), (s, writeBuffer) => + return PerformWriteAsync(FrameHeader.Size + FrameHeader.WindowUpdateLength, (thisRef: this, streamId, amount), (s, writeBuffer) => { if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(s.amount)}={s.amount}"); Span span = writeBuffer.Span; - FrameHeader.WriteTo(span, FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, s.streamId); BinaryPrimitives.WriteInt32BigEndian(span.Slice(FrameHeader.Size), s.amount); - s.thisRef.FinishWrite(span.Length, FlushTiming.Now); // make sure window updates are seen as soon as possible + return FlushTiming.Now; // make sure window updates are seen as soon as possible }); } @@ -1424,7 +1399,6 @@ private void Abort(Exception abortException) /// terminate it, which would be considered a failure, so this race condition is largely benign and inherent to /// the nature of connection pooling. /// - public bool IsExpired(long nowTicks, TimeSpan connectionLifetime, TimeSpan connectionIdleTimeout)