Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send connection WINDOW_UPDATE before RTT PING #97881

Merged
merged 11 commits into from
Feb 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ private async ValueTask ProcessHeadersFrame(FrameHeader frameHeader)
if (http2Stream != null)
{
http2Stream.OnHeadersStart();
_rttEstimator.OnDataOrHeadersReceived(this);
_rttEstimator.OnDataOrHeadersReceived(this, sendWindowUpdateBeforePing: true);
headersHandler = http2Stream;
}
else
Expand Down Expand Up @@ -766,21 +766,16 @@ private void ProcessDataFrame(FrameHeader frameHeader)

ReadOnlySpan<byte> frameData = GetFrameData(_incomingBuffer.ActiveSpan.Slice(0, frameHeader.PayloadLength), hasPad: frameHeader.PaddedFlag, hasPriority: false);

if (http2Stream != null)
{
bool endStream = frameHeader.EndStreamFlag;

http2Stream.OnResponseData(frameData, endStream);

if (!endStream && frameData.Length > 0)
{
_rttEstimator.OnDataOrHeadersReceived(this);
}
}
bool endStream = frameHeader.EndStreamFlag;
http2Stream?.OnResponseData(frameData, endStream);

if (frameData.Length > 0)
{
ExtendWindow(frameData.Length);
bool windowUpdateSent = ExtendWindow(frameData.Length);
if (http2Stream is not null && !endStream)
{
_rttEstimator.OnDataOrHeadersReceived(this, sendWindowUpdateBeforePing: !windowUpdateSent);
}
}

_incomingBuffer.Discard(frameHeader.PayloadLength);
Expand Down Expand Up @@ -1772,7 +1767,7 @@ private Task SendWindowUpdateAsync(int streamId, int amount)
});
}

private void ExtendWindow(int amount)
private bool ExtendWindow(int amount)
{
if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(amount)}={amount}");
Debug.Assert(amount > 0);
Expand All @@ -1786,14 +1781,25 @@ private void ExtendWindow(int amount)
if (_pendingWindowUpdate < ConnectionWindowThreshold)
{
if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_pendingWindowUpdate)} {_pendingWindowUpdate} < {ConnectionWindowThreshold}.");
return;
return false;
}

windowUpdateSize = _pendingWindowUpdate;
_pendingWindowUpdate = 0;
}

LogExceptions(SendWindowUpdateAsync(0, windowUpdateSize));
return true;
}

private bool ForceSendConnectionWindowUpdate()
{
if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_pendingWindowUpdate)}={_pendingWindowUpdate}");
if (_pendingWindowUpdate == 0) return false;

LogExceptions(SendWindowUpdateAsync(0, _pendingWindowUpdate));
_pendingWindowUpdate = 0;
return true;
}

public override long GetIdleTicks(long nowTicks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,18 @@ private void AdjustWindowDynamic(int bytesConsumed, Http2Stream stream)
// Assuming that the network characteristics of the connection wouldn't change much within its lifetime, we are maintaining a running minimum value.
// The more PINGs we send, the more accurate is the estimation of MinRtt, however we should be careful not to send too many of them,
// to avoid triggering the server's PING flood protection which may result in an unexpected GOAWAY.
// With most servers we are fine to send PINGs, as long as we are reading their data, this rule is well formalized for gRPC:
//
// Several strategies have been implemented to conform with real life servers.
// 1. With most servers we are fine to send PINGs as long as we are reading their data, a rule formalized by a gRPC spec:
// https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md
// As a rule of thumb, we can send send a PING whenever we receive DATA or HEADERS, however, there are some servers which allow receiving only
// a limited amount of PINGs within a given timeframe.
// To deal with the conflicting requirements:
// - We send an initial burst of 'InitialBurstCount' PINGs, to get a relatively good estimation fast
// - Afterwards, we send PINGs with the maximum frequency of 'PingIntervalInSeconds' PINGs per second
// According to this rule, we are OK to send a PING whenever we receive DATA or HEADERS, since the servers conforming to this doc
// will reset their unsolicited ping counter whenever they *send* DATA or HEADERS.
// 2. Some servers allow receiving only a limited amount of PINGs within a given timeframe.
// To deal with this, we send an initial burst of 'InitialBurstCount' (=4) PINGs, to get a relatively good estimation fast. Afterwards,
// we send PINGs each 'PingIntervalInSeconds' second, to maintain our estimation without triggering these servers.
// 3. Some servers in Google's backends reset their unsolicited ping counter when they *receive* DATA, HEADERS, or WINDOW_UPDATE.
// To deal with this, we need to make sure to send a connection WINDOW_UPDATE before sending a PING. The initial burst is an exception
// to this rule, since the mentioned server can tolerate 4 PINGs without receiving a WINDOW_UPDATE.
//
// Threading:
// OnInitialSettingsSent() is called during initialization, all other methods are triggered by HttpConnection.ProcessIncomingFramesAsync(),
Expand Down Expand Up @@ -194,7 +199,7 @@ internal void OnInitialSettingsAckReceived(Http2Connection connection)
_state = State.Waiting;
}

internal void OnDataOrHeadersReceived(Http2Connection connection)
internal void OnDataOrHeadersReceived(Http2Connection connection, bool sendWindowUpdateBeforePing)
{
if (_state != State.Waiting) return;

Expand All @@ -204,6 +209,14 @@ internal void OnDataOrHeadersReceived(Http2Connection connection)
{
if (initial) _initialBurst--;

// When sendWindowUpdateBeforePing is true, try to send a WINDOW_UPDATE to make Google backends happy.
// Unless we are doing the initial burst, do not send PING if we were not able to send the WINDOW_UPDATE.
// See point 3. in the comments above the class definition for more info.
if (sendWindowUpdateBeforePing && !connection.ForceSendConnectionWindowUpdate() && !initial)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are places that can send a windows update? Can a connection be in the situation where a window update is always being sent somewhere else and when it comes time to send a ping, the pending window update is always empty?

Copy link
Member Author

@antonfirsov antonfirsov Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are places that can send a windows update?

For connection window the standard method is ExtendWindow the new one introduced in this PR is ForceSendConnectionWindowUpdate. There are no other places sending WINDOW_UPDATE for the connection.

Can a connection be in the situation where a window update is always being sent somewhere else, the pending window update is always empty?

ExtendWindow will send a "standard" connection WINDOW_UPDATE only if receiving DATA will make _pendingWindowUpdate pass ConnectionWindowThreshold. If that's the case, sendWindowUpdateBeforePing will be false at this line, so we won't send a WINDOW_UPDATE in DataOrHeadersReceived.

Note that receiving DATA triggers ExtendWindow and DataOrHeadersReceived sequentially (with the change in the PR WINDOW_UPDATE goes first):

if (frameData.Length > 0)
{
bool windowUpdateSent = ExtendWindow(frameData.Length);
if (!endStream)
{
_rttEstimator.OnDataOrHeadersReceived(this, sendWindowUpdateBeforePing: !windowUpdateSent);
}
}

JamesNK marked this conversation as resolved.
Show resolved Hide resolved
{
return;
}

// Send a PING
_pingCounter--;
if (NetEventSource.Log.IsEnabled()) connection.Trace($"[FlowControl] Sending RTT PING with payload {_pingCounter}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2453,6 +2453,7 @@ public async Task PostAsyncDuplex_ClientSendsEndStream_Success()
HttpResponseMessage response = await responseTask;
Stream responseStream = await response.Content.ReadAsStreamAsync();

connection.IgnoreWindowUpdates();
// Send some data back and forth
await SendAndReceiveResponseDataAsync(contentBytes, responseStream, connection, streamId);
await SendAndReceiveResponseDataAsync(contentBytes, responseStream, connection, streamId);
Expand Down Expand Up @@ -2513,6 +2514,7 @@ public async Task PostAsyncDuplex_ServerSendsEndStream_Success()
HttpResponseMessage response = await responseTask;
Stream responseStream = await response.Content.ReadAsStreamAsync();

connection.IgnoreWindowUpdates();
// Send some data back and forth
await SendAndReceiveResponseDataAsync(contentBytes, responseStream, connection, streamId);
await SendAndReceiveResponseDataAsync(contentBytes, responseStream, connection, streamId);
Expand Down Expand Up @@ -2833,6 +2835,7 @@ public async Task PostAsyncDuplex_DisposeResponseBodyBeforeEnd_ResetsStreamAndTh
// This allows the request processing to complete.
duplexContent.Fail(e);

connection.IgnoreWindowUpdates(); // The RTT algorithm may send a WINDOW_UPDATE before RST_STREAM.
// Client should set RST_STREAM.
await connection.ReadRstStreamAsync(streamId);
}
Expand Down Expand Up @@ -2906,6 +2909,7 @@ public async Task PostAsyncDuplex_DisposeResponseBodyAfterEndReceivedButBeforeCo
// This allows the request processing to complete.
duplexContent.Fail(e);

connection.IgnoreWindowUpdates(); // The RTT algorithm may send a WINDOW_UPDATE before RST_STREAM.
// Client should set RST_STREAM.
await connection.ReadRstStreamAsync(streamId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ static async Task RunTest()
TimeSpan.FromMilliseconds(30),
TimeSpan.Zero,
2 * 1024 * 1024,
null);
maxWindowForPingStopValidation: MaxWindow);

Assert.True(maxCredit <= MaxWindow);
}
Expand Down Expand Up @@ -181,19 +181,34 @@ static async Task RunTest()
RemoteExecutor.Invoke(RunTest, options).Dispose();
}

[OuterLoop("Runs long")]
[Fact]
public async Task LongRunningSlowServerStream_NoInvalidPingsAreSent()
{
// A scenario similar to https://github.com/grpc/grpc-dotnet/issues/2361.
// We need to send a small amount of data so the connection window is not consumed and no "standard" WINDOW_UPDATEs are sent and
// we also need to do it very slowly to cover some RTT PINGs after the initial burst.
// This scenario should trigger the "forced WINDOW_UPDATE" logic in the implementation, ensuring that no more than 4 PINGs are sent without a WINDOW_UPDATE.
await TestClientWindowScalingAsync(
TimeSpan.FromMilliseconds(500),
TimeSpan.FromMilliseconds(500),
1024,
_output,
dataPerFrame: 32);
}

private static async Task<int> TestClientWindowScalingAsync(
TimeSpan networkDelay,
TimeSpan slowBandwidthSimDelay,
int bytesToDownload,
ITestOutputHelper output = null,
int maxWindowForPingStopValidation = int.MaxValue, // set to actual maximum to test if we stop sending PING when window reached maximum
Action<SocketsHttpHandler> configureHandler = null)
int dataPerFrame = 16384,
int maxWindowForPingStopValidation = 16 * 1024 * 1024) // set to actual maximum to test if we stop sending PING when window reached maximum
{
TimeSpan timeout = TimeSpan.FromSeconds(30);
CancellationTokenSource timeoutCts = new CancellationTokenSource(timeout);

HttpClientHandler handler = CreateHttpClientHandler(HttpVersion20.Value);
configureHandler?.Invoke(GetUnderlyingSocketsHttpHandler(handler));

using Http2LoopbackServer server = Http2LoopbackServer.CreateServer(NoAutoPingResponseHttp2Options);
using HttpClient client = new HttpClient(handler, true);
Expand Down Expand Up @@ -225,13 +240,13 @@ private static async Task<int> TestClientWindowScalingAsync(
using SemaphoreSlim writeSemaphore = new SemaphoreSlim(1);
int remainingBytes = bytesToDownload;

bool pingReceivedAfterReachingMaxWindow = false;
string unexpectedPingReason = null;
bool unexpectedFrameReceived = false;
CancellationTokenSource stopFrameProcessingCts = new CancellationTokenSource();

CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stopFrameProcessingCts.Token, timeoutCts.Token);
Task processFramesTask = ProcessIncomingFramesAsync(linkedCts.Token);
byte[] buffer = new byte[16384];
byte[] buffer = new byte[dataPerFrame];

while (remainingBytes > 0)
{
Expand Down Expand Up @@ -259,7 +274,7 @@ private static async Task<int> TestClientWindowScalingAsync(

int dataReceived = (await response.Content.ReadAsByteArrayAsync()).Length;
Assert.Equal(bytesToDownload, dataReceived);
Assert.False(pingReceivedAfterReachingMaxWindow, "Server received a PING after reaching max window");
Assert.Null(unexpectedPingReason);
Assert.False(unexpectedFrameReceived, "Server received an unexpected frame, see test output for more details.");

return maxCredit;
Expand All @@ -270,6 +285,7 @@ async Task ProcessIncomingFramesAsync(CancellationToken cancellationToken)
// We should not receive any more RTT PING's after this point
int maxWindowCreditThreshold = (int) (0.9 * maxWindowForPingStopValidation);
output?.WriteLine($"maxWindowCreditThreshold: {maxWindowCreditThreshold} maxWindowForPingStopValidation: {maxWindowForPingStopValidation}");
int pingsWithoutWindowUpdate = 0;

try
{
Expand All @@ -284,10 +300,18 @@ async Task ProcessIncomingFramesAsync(CancellationToken cancellationToken)

output?.WriteLine($"Received PING ({pingFrame.Data})");

pingsWithoutWindowUpdate++;
if (maxCredit > maxWindowCreditThreshold)
{
output?.WriteLine("PING was unexpected");
Volatile.Write(ref pingReceivedAfterReachingMaxWindow, true);
Volatile.Write(ref unexpectedPingReason, "The server received a PING after reaching max window");
output?.WriteLine($"PING was unexpected: {unexpectedPingReason}");
}

// Exceeding this limit may trigger a GOAWAY on some servers. See implementation comments for more details.
if (pingsWithoutWindowUpdate > 4)
{
Volatile.Write(ref unexpectedPingReason, $"The server received {pingsWithoutWindowUpdate} PINGs without receiving a WINDOW_UPDATE");
output?.WriteLine($"PING was unexpected: {unexpectedPingReason}");
}

await writeSemaphore.WaitAsync(cancellationToken);
Expand All @@ -296,6 +320,7 @@ async Task ProcessIncomingFramesAsync(CancellationToken cancellationToken)
}
else if (frame is WindowUpdateFrame windowUpdateFrame)
{
pingsWithoutWindowUpdate = 0;
// Ignore connection window:
if (windowUpdateFrame.StreamId != streamId) continue;

Expand Down
Loading