Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[browser][http] Fix blocking of streaming response and abort #80693

Merged
merged 9 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 102 additions & 8 deletions src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -984,13 +984,14 @@ await connection.WriteStringAsync(
}

[Theory]
[InlineData(true, true)]
[InlineData(true, false)]
[InlineData(false, true)]
[InlineData(false, false)]
[InlineData(null, false)]
[InlineData(true, true, true)]
[InlineData(true, true, false)]
[InlineData(true, false, false)]
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
[InlineData(false, true, false)]
[InlineData(false, false, false)]
[InlineData(null, false, false)]
[ActiveIssue("https://github.com/dotnet/runtime/issues/65429", typeof(PlatformDetection), nameof(PlatformDetection.IsNodeJS))]
public async Task ReadAsStreamAsync_HandlerProducesWellBehavedResponseStream(bool? chunked, bool enableWasmStreaming)
public async Task ReadAsStreamAsync_HandlerProducesWellBehavedResponseStream(bool? chunked, bool enableWasmStreaming, bool slowChunks)
{
if (IsWinHttpHandler && UseVersion >= HttpVersion20.Value)
{
Expand All @@ -1003,6 +1004,12 @@ public async Task ReadAsStreamAsync_HandlerProducesWellBehavedResponseStream(boo
return;
}

if (enableWasmStreaming && !PlatformDetection.IsBrowser)
{
// enableWasmStreaming makes only sense on Browser platform
return;
}

await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
{
var request = new HttpRequestMessage(HttpMethod.Get, uri) { Version = UseVersion };
Expand Down Expand Up @@ -1079,11 +1086,20 @@ await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>

// Various forms of reading
var buffer = new byte[1];
var buffer2 = new byte[2];

if (PlatformDetection.IsBrowser)
{
#if !NETFRAMEWORK
Assert.Equal('h', await responseStream.ReadByteAsync());
if(slowChunks)
{
Assert.Equal(1, await responseStream.ReadAsync(new Memory<byte>(buffer2)));
Assert.Equal((byte)'h', buffer2[0]);
}
else
{
Assert.Equal('h', await responseStream.ReadByteAsync());
}
Assert.Equal('e', await responseStream.ReadByteAsync());
Assert.Equal(1, await responseStream.ReadAsync(new Memory<byte>(buffer)));
Assert.Equal((byte)'l', buffer[0]);
Expand Down Expand Up @@ -1184,7 +1200,18 @@ await server.AcceptConnectionAsync(async connection =>
{
case true:
await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false);
await connection.SendResponseBodyAsync("3\r\nhel\r\n8\r\nlo world\r\n0\r\n\r\n");
if(PlatformDetection.IsBrowser && slowChunks)
{
await connection.SendResponseBodyAsync("1\r\nh\r\n", false);
await Task.Delay(100);
await connection.SendResponseBodyAsync("2\r\nel\r\n", false);
await connection.SendResponseBodyAsync("8\r\nlo world\r\n", false);
await connection.SendResponseBodyAsync("0\r\n\r\n", true);
}
else
{
await connection.SendResponseBodyAsync("3\r\nhel\r\n8\r\nlo world\r\n0\r\n\r\n");
}
break;

case false:
Expand Down Expand Up @@ -1295,6 +1322,73 @@ await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
server => server.AcceptConnectionSendResponseAndCloseAsync());
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))]
public async Task ReadAsStreamAsync_StreamingCancellation()
{
var tcs = new TaskCompletionSource<bool>();
await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
{
var request = new HttpRequestMessage(HttpMethod.Get, uri) { Version = UseVersion };
#if !NETFRAMEWORK
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
request.Options.Set(new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse"), true);
#endif

var cts = new CancellationTokenSource();
using (var client = new HttpMessageInvoker(CreateHttpClientHandler()))
using (HttpResponseMessage response = await client.SendAsync(TestAsync, request, CancellationToken.None))
{
using (Stream responseStream = await response.Content.ReadAsStreamAsync(TestAsync))
{
var buffer = new byte[1];
#if !NETFRAMEWORK
Assert.Equal(1, await responseStream.ReadAsync(new Memory<byte>(buffer)));
Assert.Equal((byte)'h', buffer[0]);
var sizePromise = responseStream.ReadAsync(new Memory<byte>(buffer), cts.Token);
cts.Cancel();
await Assert.ThrowsAsync<TaskCanceledException>(async () => await sizePromise);
tcs.SetResult(true);
#endif
}
}
}, async server =>
{
await server.AcceptConnectionAsync(async connection =>
{
await connection.ReadRequestDataAsync();
await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false);
await connection.SendResponseBodyAsync("1\r\nh\r\n", false);
await tcs.Task;
});
});
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))]
public async Task ReadAsStreamAsync_Cancellation()
{
var tcs = new TaskCompletionSource<bool>();
await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
{
var request = new HttpRequestMessage(HttpMethod.Get, uri) { Version = UseVersion };
var cts = new CancellationTokenSource();
using (var client = new HttpMessageInvoker(CreateHttpClientHandler()))
{
var responsePromise = client.SendAsync(TestAsync, request, cts.Token);
cts.Cancel();
await Assert.ThrowsAsync<TaskCanceledException>(async () => await responsePromise);
tcs.SetResult(true);
}
}, async server =>
{
await server.AcceptConnectionAsync(async connection =>
{
await connection.ReadRequestDataAsync();
await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false);
await connection.SendResponseBodyAsync("1\r\nh\r\n", false);
await tcs.Task;
});
});
}

[Fact]
public async Task Dispose_DisposingHandlerCancelsActiveOperationsWithoutResponses()
{
Expand Down
53 changes: 25 additions & 28 deletions src/mono/wasm/runtime/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

import { wrap_as_cancelable_promise } from "./cancelable-promise";
import { Module } from "./imports";
import { MemoryViewType, Span } from "./marshal";
import { mono_assert } from "./types";
import { VoidPtr } from "./types/emscripten";
Expand All @@ -21,7 +22,12 @@ export function http_wasm_abort_request(abort_controller: AbortController): void
export function http_wasm_abort_response(res: ResponseExtension): void {
res.__abort_controller.abort();
if (res.__reader) {
res.__reader.cancel();
res.__reader.cancel().catch((err) => {
if (err && err.name !== "AbortError") {
Module.printErr("MONO_WASM: Error in http_wasm_abort_response: " + err);
maraf marked this conversation as resolved.
Show resolved Hide resolved
}
// otherwise, it's expected
});
}
}

Expand Down Expand Up @@ -100,42 +106,33 @@ export function http_wasm_get_response_bytes(res: ResponseExtension, view: Span)
return bytes_read;
}

export async function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bufferPtr: VoidPtr, bufferLength: number): Promise<number> {
export function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bufferPtr: VoidPtr, bufferLength: number): Promise<number> {
// the bufferPtr is pinned by the caller
const view = new Span(bufferPtr, bufferLength, MemoryViewType.Byte);
return wrap_as_cancelable_promise(async () => {
if (!res.__chunk && res.body) {
res.__reader = res.body.getReader();
if (!res.__reader) {
res.__reader = res.body!.getReader();
}
if (!res.__chunk) {
res.__chunk = await res.__reader.read();
res.__source_offset = 0;
}
if (res.__chunk.done) {
return 0;
}

let target_offset = 0;
let bytes_read = 0;
// loop until end of browser stream or end of C# buffer
while (res.__reader && res.__chunk && !res.__chunk.done) {
const remaining_source = res.__chunk.value.byteLength - res.__source_offset;
if (remaining_source === 0) {
res.__chunk = await res.__reader.read();
res.__source_offset = 0;
continue;// are we done yet
}

const remaining_target = view.byteLength - target_offset;
const bytes_copied = Math.min(remaining_source, remaining_target);
const source_view = res.__chunk.value.subarray(res.__source_offset, res.__source_offset + bytes_copied);

// copy available bytes
view.set(source_view, target_offset);
target_offset += bytes_copied;
bytes_read += bytes_copied;
res.__source_offset += bytes_copied;
const remaining_source = res.__chunk.value.byteLength - res.__source_offset;
mono_assert(remaining_source > 0, "expected remaining_source to be greater than 0");

if (target_offset == view.byteLength) {
return bytes_read;
}
const bytes_copied = Math.min(remaining_source, view.byteLength);
const source_view = res.__chunk.value.subarray(res.__source_offset, res.__source_offset + bytes_copied);
view.set(source_view, 0);
res.__source_offset += bytes_copied;
if (remaining_source == bytes_copied) {
res.__chunk = undefined;
}
return bytes_read;

return bytes_copied;
});
}

Expand Down