From 0ca659804490074b374d95d8b31974bf0cce0242 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Wed, 28 Aug 2024 19:24:45 +0200 Subject: [PATCH 1/5] - fix finalizer - rewrite HTTP to respect child resources --- .../src/System.Net.Http.csproj | 3 + .../Http/WasiHttpHandler/WasiHttpHandler.cs | 802 ++++-------------- .../Http/WasiHttpHandler/WasiHttpInterop.cs | 290 +++++++ .../Http/WasiHttpHandler/WasiInputStream.cs | 199 +++++ .../Http/WasiHttpHandler/WasiOutputStream.cs | 173 ++++ src/mono/mono/metadata/gc.c | 4 +- 6 files changed, 831 insertions(+), 640 deletions(-) create mode 100644 src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs create mode 100644 src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs create mode 100644 src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiOutputStream.cs diff --git a/src/libraries/System.Net.Http/src/System.Net.Http.csproj b/src/libraries/System.Net.Http/src/System.Net.Http.csproj index f127cf1f5733e..b924bf22a3dc4 100644 --- a/src/libraries/System.Net.Http/src/System.Net.Http.csproj +++ b/src/libraries/System.Net.Http/src/System.Net.Http.csproj @@ -455,6 +455,9 @@ + + + diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs index 8d9abb2d433b5..04d20595c65b1 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs @@ -14,13 +14,176 @@ using WasiHttpWorld; using WasiHttpWorld.wit.imports.wasi.http.v0_2_1; using WasiHttpWorld.wit.imports.wasi.io.v0_2_1; +using static WasiHttpWorld.wit.imports.wasi.http.v0_2_1.ITypes; namespace System.Net.Http { + // on top of https://github.com/WebAssembly/wasi-http/blob/main/wit/types.wit + internal sealed class WasiRequestWrapper : IDisposable + { + private FutureIncomingResponse? future; // owned by this instance + private WasiOutputStream? wasiOutputStream; // owned by this instance + public IncomingResponse? incomingResponse; // owned by this instance + private readonly OutgoingRequest outgoingRequest; // owned by this instance + private readonly HttpRequestMessage request; + private readonly CancellationToken cancellationToken; + public Task? requestBodyComplete; + private bool isDisposed; + + public WasiRequestWrapper(HttpRequestMessage request, CancellationToken cancellationToken) + { + if (request.RequestUri is null) + { + throw new ArgumentException(); + } + + var requestHeaders = WasiHttpInterop.ConvertRequestHeaders(request); + outgoingRequest = new OutgoingRequest(requestHeaders); // we just passed the Fields ownership to OutgoingRequest + outgoingRequest.SetMethod(WasiHttpInterop.ConvertMethod(request.Method)); + outgoingRequest.SetScheme(WasiHttpInterop.ConvertScheme(request.RequestUri)); + outgoingRequest.SetAuthority(WasiHttpInterop.ConvertAuthority(request.RequestUri)); + outgoingRequest.SetPathWithQuery(request.RequestUri.PathAndQuery); + + this.request = request; + this.cancellationToken = cancellationToken; + } + + + public async Task SendRequestAsync() + { + try + { + requestBodyComplete = SendContent(); + incomingResponse = await SendRequest().ConfigureAwait(false); + + ObjectDisposedException.ThrowIf(isDisposed, this); + cancellationToken.ThrowIfCancellationRequested(); + + var response = new HttpResponseMessage((HttpStatusCode)incomingResponse.Status()); + WasiHttpInterop.ConvertResponseHeaders(incomingResponse, response); + + + // request body could be still streaming after response headers are received and started streaming response + // we will leave scope of this method + // we need to pass the ownership of the request and this wrapper to the response (via response content stream) + // unless we know that we are not streaming anymore + WasiInputStream incomingStream = new WasiInputStream(this);// passing self ownership + response.Content = new StreamContent(incomingStream); // passing incomingStream ownership to SendAsync() caller + + return response; + } + catch + { + Dispose(); + throw; + } + } + + private async Task SendRequest() + { + Console.WriteLine("SendRequestAsync A"); + try + { + future = OutgoingHandlerInterop.Handle(outgoingRequest, null); + + while (true) + { + var response = (Result, None>?)future.Get(); + if (response.HasValue) + { + var result = response.Value.AsOk; + + if (result.IsOk) + { + Console.WriteLine("SendRequestAsync: response is OK"); + return result.AsOk; + } + else + { + throw new HttpRequestException(WasiHttpInterop.ErrorCodeToString(result.AsErr)); + } + } + else + { + Console.WriteLine("SendRequestAsync B"); + await WasiHttpInterop.RegisterWasiPollable(future.Subscribe(), cancellationToken).ConfigureAwait(false); + } + } + } + catch (OperationCanceledException oce) + { + if (cancellationToken.IsCancellationRequested) + { + Http.CancellationHelper.ThrowIfCancellationRequested(oce, cancellationToken); + } + throw; + } + catch (WitException e) + { + throw new HttpRequestException(WasiHttpInterop.ErrorCodeToString((ErrorCode)e.Value), e); + } + } + + public async Task SendContent() + { + var content = request.Content; + if (content is not null) + { + wasiOutputStream = new WasiOutputStream(outgoingRequest.Body()); // passing body ownership + await content.CopyToAsync(wasiOutputStream, cancellationToken).ConfigureAwait(false); + wasiOutputStream.Close(); + } + } + + public void Dispose() + { + if (!isDisposed) + { + isDisposed = true; + Console.WriteLine("WasiRequestWrapper.Dispose A"); + wasiOutputStream?.Dispose(); + Console.WriteLine("WasiRequestWrapper.Dispose B"); + incomingResponse?.Dispose(); + Console.WriteLine("WasiRequestWrapper.Dispose C"); + outgoingRequest.Dispose(); + Console.WriteLine("WasiRequestWrapper.Dispose D"); + future?.Dispose(); + Console.WriteLine("WasiRequestWrapper.Dispose E"); + } + } + } + internal sealed class WasiHttpHandler : HttpMessageHandler { + public const bool SupportsAutomaticDecompression = false; + public const bool SupportsProxy = false; + public const bool SupportsRedirectConfiguration = false; + + private Dictionary? _properties; + public IDictionary Properties => _properties ??= new Dictionary(); + + protected internal override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + var wasiRequest = new WasiRequestWrapper(request, cancellationToken); + try + { + return await wasiRequest.SendRequestAsync().ConfigureAwait(false); + } + catch + { + // if there was exception or cancellation, we need to dispose the request + // otherwise it will be disposed by the response + wasiRequest.Dispose(); + throw; + } + } + #region PlatformNotSupported #pragma warning disable CA1822 + + internal ClientCertificateOption ClientCertificateOptions; + + public bool UseCookies { get => throw new PlatformNotSupportedException(); @@ -99,17 +262,6 @@ public bool AllowAutoRedirect set => throw new PlatformNotSupportedException(); } #pragma warning restore CA1822 - #endregion - - internal ClientCertificateOption ClientCertificateOptions; - - public const bool SupportsAutomaticDecompression = false; - public const bool SupportsProxy = false; - public const bool SupportsRedirectConfiguration = false; - - private Dictionary? _properties; - public IDictionary Properties => - _properties ??= new Dictionary(); protected internal override HttpResponseMessage Send( HttpRequestMessage request, @@ -119,632 +271,6 @@ CancellationToken cancellationToken throw new PlatformNotSupportedException(); } - protected internal override async Task SendAsync( - HttpRequestMessage request, - CancellationToken cancellationToken - ) - { - if (request.RequestUri is null) - { - throw new ArgumentException(); - } - - var requestMethod = request.Method.ToString(); - var uri = request.RequestUri; - - ITypes.Method method; - switch (requestMethod) - { - case "": - case "GET": - method = ITypes.Method.get(); - break; - case "HEAD": - method = ITypes.Method.head(); - break; - case "POST": - method = ITypes.Method.post(); - break; - case "PUT": - method = ITypes.Method.put(); - break; - case "DELETE": - method = ITypes.Method.delete(); - break; - case "CONNECT": - method = ITypes.Method.connect(); - break; - case "OPTIONS": - method = ITypes.Method.options(); - break; - case "TRACE": - method = ITypes.Method.trace(); - break; - case "PATCH": - method = ITypes.Method.patch(); - break; - default: - method = ITypes.Method.other(requestMethod); - break; - } - - ITypes.Scheme scheme; - switch (uri.Scheme) - { - case "": - case "http": - scheme = ITypes.Scheme.http(); - break; - case "https": - scheme = ITypes.Scheme.https(); - break; - default: - scheme = ITypes.Scheme.other(uri.Scheme); - break; - } - - string authority; - if (uri.Authority.Length == 0) - { - // `wasi:http/outgoing-handler` requires a non-empty authority, - // so we set one here: - if (scheme.Tag == ITypes.Scheme.HTTPS) - { - authority = ":443"; - } - else - { - authority = ":80"; - } - } - else - { - authority = uri.Authority; - } - - var headers = new List<(string, byte[])>(); - foreach (var pair in request.Headers) - { - foreach (var value in pair.Value) - { - headers.Add((pair.Key, Encoding.UTF8.GetBytes(value))); - } - } - if (request.Content is not null) - { - foreach (var pair in request.Content.Headers) - { - foreach (var value in pair.Value) - { - headers.Add((pair.Key, Encoding.UTF8.GetBytes(value))); - } - } - } - - var outgoingRequest = new ITypes.OutgoingRequest(ITypes.Fields.FromList(headers)); - outgoingRequest.SetMethod(method); - outgoingRequest.SetScheme(scheme); - outgoingRequest.SetAuthority(authority); - outgoingRequest.SetPathWithQuery(uri.PathAndQuery); - - var outgoingStream = new OutputStream(outgoingRequest.Body()); - - Func> sendContent = async () => - { - await SendContentAsync(request.Content, outgoingStream).ConfigureAwait(false); - return null; - }; - - // Concurrently send the request and the content stream, allowing - // the server to start sending a response before it's received the - // entire request body. - var incomingResponse = ( - await Task.WhenAll( - new Task[] - { - SendRequestAsync(outgoingRequest, cancellationToken), - sendContent() - } - ) - .ConfigureAwait(false) - )[0]; - - if (incomingResponse is null) - { - // Shouldn't be possible, since `SendRequestAsync` always - // returns a non-null value. - throw new Exception("unreachable code"); - } - - var response = new HttpResponseMessage((HttpStatusCode)incomingResponse.Status()); - var responseHeaders = incomingResponse.Headers().Entries(); - response.Content = new StreamContent(new InputStream(incomingResponse.Consume())); - foreach ((var key, var value) in responseHeaders) - { - var valueString = Encoding.UTF8.GetString(value); - if ( - HeaderDescriptor.TryGet(key, out HeaderDescriptor descriptor) - && (descriptor.HeaderType & HttpHeaderType.Content) != 0 - ) - { - response.Content.Headers.Add(key, valueString); - } - else - { - response.Headers.Add(key, valueString); - } - } - - return response; - } - - private static async Task SendRequestAsync( - ITypes.OutgoingRequest request, - CancellationToken cancellationToken - ) - { - ITypes.FutureIncomingResponse future; - try - { - future = OutgoingHandlerInterop.Handle(request, null); - } - catch (WasiHttpWorld.WitException e) - { - var message = ErrorCodeToString((ITypes.ErrorCode)e.Value); - throw new Exception($"Request Error: {message}"); - } - - while (true) - { - var response = future.Get(); - if (response is not null) - { - var result = ( - (Result, None>)response - ).AsOk; - - if (result.IsOk) - { - return result.AsOk; - } - else - { - var message = ErrorCodeToString(result.AsErr); - throw new Exception($"Request Error: {message}"); - } - } - else - { - await RegisterWasiPollable(future.Subscribe(), cancellationToken).ConfigureAwait(false); - } - } - } - - private static string ErrorCodeToString(ITypes.ErrorCode code) - { - // TODO: include payload data in result where applicable - switch (code.Tag) - { - case ITypes.ErrorCode.DNS_TIMEOUT: - return "DNS_TIMEOUT"; - - case ITypes.ErrorCode.DNS_ERROR: - return "DNS_ERROR"; - - case ITypes.ErrorCode.DESTINATION_NOT_FOUND: - return "DESTINATION_NOT_FOUND"; - - case ITypes.ErrorCode.DESTINATION_UNAVAILABLE: - return "DESTINATION_UNAVAILABLE"; - - case ITypes.ErrorCode.DESTINATION_IP_PROHIBITED: - return "DESTINATION_IP_PROHIBITED"; - - case ITypes.ErrorCode.DESTINATION_IP_UNROUTABLE: - return "DESTINATION_IP_UNROUTABLE"; - - case ITypes.ErrorCode.CONNECTION_REFUSED: - return "CONNECTION_REFUSED"; - - case ITypes.ErrorCode.CONNECTION_TERMINATED: - return "CONNECTION_TERMINATED"; - - case ITypes.ErrorCode.CONNECTION_TIMEOUT: - return "CONNECTION_TIMEOUT"; - - case ITypes.ErrorCode.CONNECTION_READ_TIMEOUT: - return "CONNECTION_READ_TIMEOUT"; - - case ITypes.ErrorCode.CONNECTION_WRITE_TIMEOUT: - return "CONNECTION_WRITE_TIMEOUT"; - - case ITypes.ErrorCode.CONNECTION_LIMIT_REACHED: - return "CONNECTION_LIMIT_REACHED"; - - case ITypes.ErrorCode.TLS_PROTOCOL_ERROR: - return "TLS_PROTOCOL_ERROR"; - - case ITypes.ErrorCode.TLS_CERTIFICATE_ERROR: - return "TLS_CERTIFICATE_ERROR"; - - case ITypes.ErrorCode.TLS_ALERT_RECEIVED: - return "TLS_ALERT_RECEIVED"; - - case ITypes.ErrorCode.HTTP_REQUEST_DENIED: - return "HTTP_REQUEST_DENIED"; - - case ITypes.ErrorCode.HTTP_REQUEST_LENGTH_REQUIRED: - return "HTTP_REQUEST_LENGTH_REQUIRED"; - - case ITypes.ErrorCode.HTTP_REQUEST_BODY_SIZE: - return "HTTP_REQUEST_BODY_SIZE"; - - case ITypes.ErrorCode.HTTP_REQUEST_METHOD_INVALID: - return "HTTP_REQUEST_METHOD_INVALID"; - - case ITypes.ErrorCode.HTTP_REQUEST_URI_INVALID: - return "HTTP_REQUEST_URI_INVALID"; - - case ITypes.ErrorCode.HTTP_REQUEST_URI_TOO_LONG: - return "HTTP_REQUEST_URI_TOO_LONG"; - - case ITypes.ErrorCode.HTTP_REQUEST_HEADER_SECTION_SIZE: - return "HTTP_REQUEST_HEADER_SECTION_SIZE"; - - case ITypes.ErrorCode.HTTP_REQUEST_HEADER_SIZE: - return "HTTP_REQUEST_HEADER_SIZE"; - - case ITypes.ErrorCode.HTTP_REQUEST_TRAILER_SECTION_SIZE: - return "HTTP_REQUEST_TRAILER_SECTION_SIZE"; - - case ITypes.ErrorCode.HTTP_REQUEST_TRAILER_SIZE: - return "HTTP_REQUEST_TRAILER_SIZE"; - - case ITypes.ErrorCode.HTTP_RESPONSE_INCOMPLETE: - return "HTTP_RESPONSE_INCOMPLETE"; - - case ITypes.ErrorCode.HTTP_RESPONSE_HEADER_SECTION_SIZE: - return "HTTP_RESPONSE_HEADER_SECTION_SIZE"; - - case ITypes.ErrorCode.HTTP_RESPONSE_HEADER_SIZE: - return "HTTP_RESPONSE_HEADER_SIZE"; - - case ITypes.ErrorCode.HTTP_RESPONSE_BODY_SIZE: - return "HTTP_RESPONSE_BODY_SIZE"; - - case ITypes.ErrorCode.HTTP_RESPONSE_TRAILER_SECTION_SIZE: - return "HTTP_RESPONSE_TRAILER_SECTION_SIZE"; - - case ITypes.ErrorCode.HTTP_RESPONSE_TRAILER_SIZE: - return "HTTP_RESPONSE_TRAILER_SIZE"; - - case ITypes.ErrorCode.HTTP_RESPONSE_TRANSFER_CODING: - return "HTTP_RESPONSE_TRANSFER_CODING"; - - case ITypes.ErrorCode.HTTP_RESPONSE_CONTENT_CODING: - return "HTTP_RESPONSE_CONTENT_CODING"; - - case ITypes.ErrorCode.HTTP_RESPONSE_TIMEOUT: - return "HTTP_RESPONSE_TIMEOUT"; - - case ITypes.ErrorCode.HTTP_UPGRADE_FAILED: - return "HTTP_UPGRADE_FAILED"; - - case ITypes.ErrorCode.HTTP_PROTOCOL_ERROR: - return "HTTP_PROTOCOL_ERROR"; - - case ITypes.ErrorCode.LOOP_DETECTED: - return "LOOP_DETECTED"; - - case ITypes.ErrorCode.CONFIGURATION_ERROR: - return "CONFIGURATION_ERROR"; - - case ITypes.ErrorCode.INTERNAL_ERROR: - return "INTERNAL_ERROR"; - - default: - return $"{code.Tag}"; - } - } - - private static async Task SendContentAsync(HttpContent? content, Stream stream) - { - try - { - if (content is not null) - { - await content.CopyToAsync(stream).ConfigureAwait(false); - } - } - finally - { - stream.Dispose(); - } - } - - private static Task RegisterWasiPollable(IPoll.Pollable pollable, CancellationToken cancellationToken) - { - var handle = pollable.Handle; - - // this will effectively neutralize Dispose() of the Pollable() - // because in the CoreLib we create another instance, which will dispose it - pollable.Handle = 0; - GC.SuppressFinalize(pollable); - - return CallRegisterWasiPollableHandle((Thread)null!, handle, cancellationToken); - - } - - [UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "RegisterWasiPollableHandle")] - private static extern Task CallRegisterWasiPollableHandle(Thread t, int handle, CancellationToken cancellationToken); - - private sealed class InputStream : Stream - { - private ITypes.IncomingBody body; - private IStreams.InputStream stream; - private int offset; - private byte[]? buffer; - private bool closed; - - public InputStream(ITypes.IncomingBody body) - { - this.body = body; - this.stream = body.Stream(); - } - - ~InputStream() - { - Dispose(false); - } - - public override bool CanRead => true; - public override bool CanWrite => false; - public override bool CanSeek => false; - public override long Length => throw new NotImplementedException(); - public override long Position - { - get => throw new NotImplementedException(); - set => throw new NotImplementedException(); - } - - protected override void Dispose(bool disposing) - { - stream.Dispose(); - ITypes.IncomingBody.Finish(body); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotImplementedException(); - } - - public override void Flush() - { - // ignore - } - - public override void SetLength(long length) - { - throw new NotImplementedException(); - } - - public override int Read(byte[] buffer, int offset, int length) - { - throw new NotImplementedException(); - } - - public override void Write(byte[] buffer, int offset, int length) - { - throw new NotImplementedException(); - } - - public override async Task ReadAsync( - byte[] bytes, - int offset, - int length, - CancellationToken cancellationToken - ) - { - // TODO: handle `cancellationToken` - while (true) - { - if (closed) - { - return 0; - } - else if (this.buffer == null) - { - try - { - // TODO: should we add a special case to the bindings generator - // to allow passing a buffer to IStreams.InputStream.Read and - // avoid the extra copy? - var result = stream.Read(16 * 1024); - var buffer = result; - if (buffer.Length == 0) - { - await RegisterWasiPollable(stream.Subscribe(), cancellationToken) - .ConfigureAwait(false); - } - else - { - this.buffer = buffer; - this.offset = 0; - } - } - catch (WitException e) - { - if (((IStreams.StreamError)e.Value).Tag == IStreams.StreamError.CLOSED) - { - closed = true; - return 0; - } - else - { - throw e; - } - } - } - else - { - var min = Math.Min(this.buffer.Length - this.offset, length); - Array.Copy(this.buffer, this.offset, bytes, offset, min); - if (min < buffer.Length - this.offset) - { - this.offset += min; - } - else - { - this.buffer = null; - } - return min; - } - } - } - - public override async ValueTask ReadAsync( - Memory buffer, - CancellationToken cancellationToken = default - ) - { - // TODO: avoid copy when possible and use ArrayPool when not - var dst = new byte[buffer.Length]; - // We disable "CA1835: Prefer the memory-based overloads of - // ReadAsync/WriteAsync methods in stream-based classes" for - // now, since `ReadyAsync(byte[], int, int, CancellationToken)` - // is where the implementation currently resides, but we should - // revisit this if/when `wit-bindgen` learns to generate - // memory-based bindings. -#pragma warning disable CA1835 - var result = await ReadAsync(dst, 0, buffer.Length, cancellationToken) - .ConfigureAwait(false); -#pragma warning restore CA1835 - new ReadOnlySpan(dst, 0, result).CopyTo(buffer.Span); - return result; - } - } - - private sealed class OutputStream : Stream - { - private ITypes.OutgoingBody body; - private IStreams.OutputStream stream; - - public OutputStream(ITypes.OutgoingBody body) - { - this.body = body; - this.stream = body.Write(); - } - - ~OutputStream() - { - Dispose(false); - } - - public override bool CanRead => false; - public override bool CanWrite => true; - public override bool CanSeek => false; - public override long Length => throw new NotImplementedException(); - public override long Position - { - get => throw new NotImplementedException(); - set => throw new NotImplementedException(); - } - - protected override void Dispose(bool disposing) - { - stream.Dispose(); - ITypes.OutgoingBody.Finish(body, null); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotImplementedException(); - } - - public override void Flush() - { - // ignore - // - // Note that flushing a `wasi:io/streams/output-stream` is an - // asynchronous operation, so it's not clear how we would - // implement it here instead of taking care of it as part of - // `WriteAsync`. - } - - public override void SetLength(long length) - { - throw new NotImplementedException(); - } - - public override int Read(byte[] buffer, int offset, int length) - { - throw new NotImplementedException(); - } - - public override void Write(byte[] buffer, int offset, int length) - { - throw new NotImplementedException(); - } - - public override async Task WriteAsync( - byte[] bytes, - int offset, - int length, - CancellationToken cancellationToken - ) - { - var limit = offset + length; - var flushing = false; - while (true) - { - var count = (int)stream.CheckWrite(); - if (count == 0) - { - await RegisterWasiPollable(stream.Subscribe(), cancellationToken).ConfigureAwait(false); - } - else if (offset == limit) - { - if (flushing) - { - return; - } - else - { - stream.Flush(); - flushing = true; - } - } - else - { - var min = Math.Min(count, limit - offset); - if (offset == 0 && min == bytes.Length) - { - stream.Write(bytes); - } - else - { - // TODO: is there a more efficient option than copying here? - // Do we need to change the binding generator to accept - // e.g. `Span`s? - var copy = new byte[min]; - Array.Copy(bytes, offset, copy, 0, min); - stream.Write(copy); - } - offset += min; - } - } - } - - public override ValueTask WriteAsync( - ReadOnlyMemory buffer, - CancellationToken cancellationToken = default - ) - { - // TODO: avoid copy when possible and use ArrayPool when not - var copy = new byte[buffer.Length]; - buffer.Span.CopyTo(copy); - return new ValueTask(WriteAsync(copy, 0, buffer.Length, cancellationToken)); - } - } + #endregion } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs new file mode 100644 index 0000000000000..b53ebcd999ffa --- /dev/null +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs @@ -0,0 +1,290 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Net.Http.Headers; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices.Marshalling; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using WasiHttpWorld; +using WasiHttpWorld.wit.imports.wasi.http.v0_2_1; +using WasiHttpWorld.wit.imports.wasi.io.v0_2_1; +using static WasiHttpWorld.wit.imports.wasi.http.v0_2_1.ITypes; +using static WasiHttpWorld.wit.imports.wasi.io.v0_2_1.IStreams; + +namespace System.Net.Http +{ + internal static class WasiHttpInterop + { + public static Task RegisterWasiPollable(IPoll.Pollable pollable, CancellationToken cancellationToken) + { + var handle = pollable.Handle; + + // this will effectively neutralize Dispose() of the Pollable() + // because in the CoreLib we create another instance, which will dispose it + pollable.Handle = 0; + GC.SuppressFinalize(pollable); + + return CallRegisterWasiPollableHandle((Thread)null!, handle, cancellationToken); + + [UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "RegisterWasiPollableHandle")] + static extern Task CallRegisterWasiPollableHandle(Thread t, int handle, CancellationToken cancellationToken); + } + + public static Method ConvertMethod(HttpMethod requestMethod) + { + Method method; + switch (requestMethod.Method) + { + case "": + case "GET": + method = Method.get(); + break; + case "HEAD": + method = Method.head(); + break; + case "POST": + method = Method.post(); + break; + case "PUT": + method = Method.put(); + break; + case "DELETE": + method = Method.delete(); + break; + case "CONNECT": + method = Method.connect(); + break; + case "OPTIONS": + method = Method.options(); + break; + case "TRACE": + method = Method.trace(); + break; + case "PATCH": + method = Method.patch(); + break; + default: + method = Method.other(requestMethod.Method); + break; + } + return method; + } + + public static Scheme ConvertScheme(Uri uri) + { + Scheme scheme; + switch (uri.Scheme) + { + case "": + case "http": + scheme = Scheme.http(); + break; + case "https": + scheme = Scheme.https(); + break; + default: + scheme = Scheme.other(uri.Scheme); + break; + } + return scheme; + } + + public static string ConvertAuthority(Uri uri) + { + // `wasi:http/outgoing-handler` requires a non-empty authority, + // so we set one here: + if (string.IsNullOrEmpty(uri.Authority)) + { + if (uri.Scheme == "https") + { + return ":443"; + } + else + { + return ":80"; + } + } + else + { + return uri.Authority; + } + } + + public static Fields ConvertRequestHeaders(HttpRequestMessage request) + { + var headers = new List<(string, byte[])>(); + foreach (var pair in request.Headers) + { + foreach (var value in pair.Value) + { + headers.Add((pair.Key, Encoding.UTF8.GetBytes(value))); + } + } + if (request.Content is not null) + { + foreach (var pair in request.Content.Headers) + { + foreach (var value in pair.Value) + { + headers.Add((pair.Key, Encoding.UTF8.GetBytes(value))); + } + } + } + return Fields.FromList(headers); + } + + public static void ConvertResponseHeaders(IncomingResponse incomingResponse, HttpResponseMessage response) + { + using var headers = incomingResponse.Headers(); + foreach ((var key, var value) in headers.Entries()) + { + var valueString = Encoding.UTF8.GetString(value); + if (IsContentHeader(key)) + { + response.Content.Headers.Add(key, valueString); + } + else + { + response.Headers.Add(key, valueString); + } + } + } + + private static bool IsContentHeader(string headerName) + { + return HeaderDescriptor.TryGet(headerName, out HeaderDescriptor descriptor) && (descriptor.HeaderType & HttpHeaderType.Content) != 0; + } + + public static string ErrorCodeToString(ErrorCode code) + { + // TODO: include payload data in result where applicable + switch (code.Tag) + { + case ErrorCode.DNS_TIMEOUT: + return "DNS_TIMEOUT"; + + case ErrorCode.DNS_ERROR: + return "DNS_ERROR"; + + case ErrorCode.DESTINATION_NOT_FOUND: + return "DESTINATION_NOT_FOUND"; + + case ErrorCode.DESTINATION_UNAVAILABLE: + return "DESTINATION_UNAVAILABLE"; + + case ErrorCode.DESTINATION_IP_PROHIBITED: + return "DESTINATION_IP_PROHIBITED"; + + case ErrorCode.DESTINATION_IP_UNROUTABLE: + return "DESTINATION_IP_UNROUTABLE"; + + case ErrorCode.CONNECTION_REFUSED: + return "CONNECTION_REFUSED"; + + case ErrorCode.CONNECTION_TERMINATED: + return "CONNECTION_TERMINATED"; + + case ErrorCode.CONNECTION_TIMEOUT: + return "CONNECTION_TIMEOUT"; + + case ErrorCode.CONNECTION_READ_TIMEOUT: + return "CONNECTION_READ_TIMEOUT"; + + case ErrorCode.CONNECTION_WRITE_TIMEOUT: + return "CONNECTION_WRITE_TIMEOUT"; + + case ErrorCode.CONNECTION_LIMIT_REACHED: + return "CONNECTION_LIMIT_REACHED"; + + case ErrorCode.TLS_PROTOCOL_ERROR: + return "TLS_PROTOCOL_ERROR"; + + case ErrorCode.TLS_CERTIFICATE_ERROR: + return "TLS_CERTIFICATE_ERROR"; + + case ErrorCode.TLS_ALERT_RECEIVED: + return "TLS_ALERT_RECEIVED"; + + case ErrorCode.HTTP_REQUEST_DENIED: + return "HTTP_REQUEST_DENIED"; + + case ErrorCode.HTTP_REQUEST_LENGTH_REQUIRED: + return "HTTP_REQUEST_LENGTH_REQUIRED"; + + case ErrorCode.HTTP_REQUEST_BODY_SIZE: + return "HTTP_REQUEST_BODY_SIZE"; + + case ErrorCode.HTTP_REQUEST_METHOD_INVALID: + return "HTTP_REQUEST_METHOD_INVALID"; + + case ErrorCode.HTTP_REQUEST_URI_INVALID: + return "HTTP_REQUEST_URI_INVALID"; + + case ErrorCode.HTTP_REQUEST_URI_TOO_LONG: + return "HTTP_REQUEST_URI_TOO_LONG"; + + case ErrorCode.HTTP_REQUEST_HEADER_SECTION_SIZE: + return "HTTP_REQUEST_HEADER_SECTION_SIZE"; + + case ErrorCode.HTTP_REQUEST_HEADER_SIZE: + return "HTTP_REQUEST_HEADER_SIZE"; + + case ErrorCode.HTTP_REQUEST_TRAILER_SECTION_SIZE: + return "HTTP_REQUEST_TRAILER_SECTION_SIZE"; + + case ErrorCode.HTTP_REQUEST_TRAILER_SIZE: + return "HTTP_REQUEST_TRAILER_SIZE"; + + case ErrorCode.HTTP_RESPONSE_INCOMPLETE: + return "HTTP_RESPONSE_INCOMPLETE"; + + case ErrorCode.HTTP_RESPONSE_HEADER_SECTION_SIZE: + return "HTTP_RESPONSE_HEADER_SECTION_SIZE"; + + case ErrorCode.HTTP_RESPONSE_HEADER_SIZE: + return "HTTP_RESPONSE_HEADER_SIZE"; + + case ErrorCode.HTTP_RESPONSE_BODY_SIZE: + return "HTTP_RESPONSE_BODY_SIZE"; + + case ErrorCode.HTTP_RESPONSE_TRAILER_SECTION_SIZE: + return "HTTP_RESPONSE_TRAILER_SECTION_SIZE"; + + case ErrorCode.HTTP_RESPONSE_TRAILER_SIZE: + return "HTTP_RESPONSE_TRAILER_SIZE"; + + case ErrorCode.HTTP_RESPONSE_TRANSFER_CODING: + return "HTTP_RESPONSE_TRANSFER_CODING"; + + case ErrorCode.HTTP_RESPONSE_CONTENT_CODING: + return "HTTP_RESPONSE_CONTENT_CODING"; + + case ErrorCode.HTTP_RESPONSE_TIMEOUT: + return "HTTP_RESPONSE_TIMEOUT"; + + case ErrorCode.HTTP_UPGRADE_FAILED: + return "HTTP_UPGRADE_FAILED"; + + case ErrorCode.HTTP_PROTOCOL_ERROR: + return "HTTP_PROTOCOL_ERROR"; + + case ErrorCode.LOOP_DETECTED: + return "LOOP_DETECTED"; + + case ErrorCode.CONFIGURATION_ERROR: + return "CONFIGURATION_ERROR"; + + case ErrorCode.INTERNAL_ERROR: + return "INTERNAL_ERROR"; + + default: + return $"{code.Tag}"; + } + } + } +} diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs new file mode 100644 index 0000000000000..4993408f172e0 --- /dev/null +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs @@ -0,0 +1,199 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using WasiHttpWorld; +using WasiHttpWorld.wit.imports.wasi.http.v0_2_1; +using WasiHttpWorld.wit.imports.wasi.io.v0_2_1; +using static WasiHttpWorld.wit.imports.wasi.http.v0_2_1.ITypes; +using static WasiHttpWorld.wit.imports.wasi.io.v0_2_1.IStreams; + +namespace System.Net.Http +{ + // on top of https://github.com/WebAssembly/wasi-io/blob/main/wit/streams.wit + internal sealed class WasiInputStream : Stream + { + private WasiRequestWrapper wrapper; // owned by this instance + private IncomingBody body; // owned by this instance + private InputStream stream; // owned by this instance + + private int offset; + private byte[]? buffer; + private bool otherSideClosed; + internal bool isClosed; + + public override bool CanRead => true; + public override bool CanWrite => false; + public override bool CanSeek => false; + + public WasiInputStream(WasiRequestWrapper wrapper) + { + this.wrapper = wrapper; + this.body = wrapper.incomingResponse!.Consume(); + this.stream = body.Stream(); + } + + ~WasiInputStream() + { + Dispose(false); + } + + public override void Close() + { + Console.WriteLine("WasiInputStream.Close " + isClosed); + if (!isClosed) + { + isClosed = true; + stream.Dispose(); + var futureTrailers = IncomingBody.Finish(body); // we just passed body ownership to Finish + futureTrailers.Dispose(); + if (wrapper.requestBodyComplete != null && wrapper.requestBodyComplete.IsCompleted) + { + Console.WriteLine("WasiInputStream.Close WRAPPER"); + wrapper.Dispose(); + } + } + base.Close(); + } + + protected override void Dispose(bool disposing) + { + Console.WriteLine("WasiInputStream.Dispose" + isClosed); + if (!isClosed) + { + isClosed = true; + stream.Dispose(); + body.Dispose(); + } + wrapper.Dispose(); + + base.Dispose(disposing); + } + + public override async Task ReadAsync( + byte[] bytes, + int offset, + int length, + CancellationToken cancellationToken + ) + { + ObjectDisposedException.ThrowIf(isClosed, this); + cancellationToken.ThrowIfCancellationRequested(); + while (true) + { + if (otherSideClosed) + { + return 0; + } + else if (this.buffer == null) + { + try + { + // TODO: should we add a special case to the bindings generator + // to allow passing a buffer to InputStream.Read and + // avoid the extra copy? + var result = stream.Read(16 * 1024); + var buffer = result; + if (buffer.Length == 0) + { + cancellationToken.ThrowIfCancellationRequested(); + await WasiHttpInterop.RegisterWasiPollable(stream.Subscribe(), cancellationToken).ConfigureAwait(false); + ObjectDisposedException.ThrowIf(isClosed, this); + } + else + { + this.buffer = buffer; + this.offset = 0; + } + } + catch (WitException e) + { + if (((StreamError)e.Value).Tag == StreamError.CLOSED) + { + otherSideClosed = true; + return 0; + } + else + { + // TODO translate error ? + throw; + } + } + } + else + { + var min = Math.Min(this.buffer.Length - this.offset, length); + Array.Copy(this.buffer, this.offset, bytes, offset, min); + if (min < buffer.Length - this.offset) + { + this.offset += min; + } + else + { + this.buffer = null; + } + return min; + } + } + } + + public override async ValueTask ReadAsync( + Memory buffer, + CancellationToken cancellationToken = default + ) + { + // TODO: avoid copy when possible and use ArrayPool when not + var dst = new byte[buffer.Length]; + // We disable "CA1835: Prefer the memory-based overloads of + // ReadAsync/WriteAsync methods in stream-based classes" for + // now, since `ReadyAsync(byte[], int, int, CancellationToken)` + // is where the implementation currently resides, but we should + // revisit this if/when `wit-bindgen` learns to generate + // memory-based bindings. +#pragma warning disable CA1835 + var result = await ReadAsync(dst, 0, buffer.Length, cancellationToken) + .ConfigureAwait(false); +#pragma warning restore CA1835 + new ReadOnlySpan(dst, 0, result).CopyTo(buffer.Span); + return result; + } + + #region PlatformNotSupported + + public override void Flush() + { + // ignore + } + + public override void SetLength(long length) + { + throw new PlatformNotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int length) + { + throw new PlatformNotSupportedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new PlatformNotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int length) + { + throw new PlatformNotSupportedException(); + } + + public override long Length => throw new PlatformNotSupportedException(); + public override long Position + { + get => throw new PlatformNotSupportedException(); + set => throw new PlatformNotSupportedException(); + } + + #endregion + } +} diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiOutputStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiOutputStream.cs new file mode 100644 index 0000000000000..22740b85a3a76 --- /dev/null +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiOutputStream.cs @@ -0,0 +1,173 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using WasiHttpWorld; +using WasiHttpWorld.wit.imports.wasi.http.v0_2_1; +using WasiHttpWorld.wit.imports.wasi.io.v0_2_1; +using static WasiHttpWorld.wit.imports.wasi.http.v0_2_1.ITypes; +using static WasiHttpWorld.wit.imports.wasi.io.v0_2_1.IStreams; + +namespace System.Net.Http +{ + // on top of https://github.com/WebAssembly/wasi-io/blob/main/wit/streams.wit + internal sealed class WasiOutputStream : Stream + { + private OutputStream stream; // owned by this instance + private OutgoingBody body; // owned by this instance + internal bool isClosed; + + public override bool CanRead => false; + public override bool CanWrite => true; + public override bool CanSeek => false; + + public WasiOutputStream(OutgoingBody body) + { + this.body = body; + this.stream = body.Write(); + } + + ~WasiOutputStream() + { + Dispose(false); + } + + public override void Close() + { + Console.WriteLine("WasiOutputStream.Close " + isClosed); + if (!isClosed) + { + Console.WriteLine("WasiOutputStream.Close A"); + isClosed = true; + Console.WriteLine("WasiOutputStream.Close B"); + stream.Dispose(); + Console.WriteLine("WasiOutputStream.Close C"); + OutgoingBody.Finish(body, null); + Console.WriteLine("WasiOutputStream.Close D"); + } + base.Close(); + Console.WriteLine("WasiOutputStream.Close E"); + } + + protected override void Dispose(bool disposing) + { + Console.WriteLine("WasiOutputStream.Dispose" + isClosed); + if (!isClosed) + { + Console.WriteLine("WasiOutputStream.Dispose A"); + isClosed = true; + stream.Dispose(); + Console.WriteLine("WasiOutputStream.Dispose B"); + body.Dispose(); + Console.WriteLine("WasiOutputStream.Dispose C"); + } + base.Dispose(disposing); + Console.WriteLine("WasiOutputStream.Dispose E"); + } + + public override async Task WriteAsync( + byte[] bytes, + int offset, + int length, + CancellationToken cancellationToken + ) + { + ObjectDisposedException.ThrowIf(isClosed, this); + var limit = offset + length; + var flushing = false; + while (true) + { + var count = (int)stream.CheckWrite(); + if (count == 0) + { + await WasiHttpInterop.RegisterWasiPollable(stream.Subscribe(), cancellationToken).ConfigureAwait(false); + ObjectDisposedException.ThrowIf(isClosed, this); + } + else if (offset == limit) + { + if (flushing) + { + return; + } + else + { + stream.Flush(); + flushing = true; + } + } + else + { + var min = Math.Min(count, limit - offset); + if (offset == 0 && min == bytes.Length) + { + stream.Write(bytes); + } + else + { + // TODO: is there a more efficient option than copying here? + // Do we need to change the binding generator to accept + // e.g. `Span`s? + var copy = new byte[min]; + Array.Copy(bytes, offset, copy, 0, min); + stream.Write(copy); + } + offset += min; + } + } + } + + public override ValueTask WriteAsync( + ReadOnlyMemory buffer, + CancellationToken cancellationToken = default + ) + { + // TODO: avoid copy when possible and use ArrayPool when not + var copy = new byte[buffer.Length]; + buffer.Span.CopyTo(copy); + return new ValueTask(WriteAsync(copy, 0, buffer.Length, cancellationToken)); + } + + #region PlatformNotSupported + + public override void Flush() + { + // ignore + // + // Note that flushing a `wasi:io/streams/output-stream` is an + // asynchronous operation, so it's not clear how we would + // implement it here instead of taking care of it as part of + // `WriteAsync`. + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new PlatformNotSupportedException(); + } + + public override void SetLength(long length) + { + throw new PlatformNotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int length) + { + throw new PlatformNotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int length) + { + throw new PlatformNotSupportedException(); + } + + public override long Length => throw new PlatformNotSupportedException(); + public override long Position + { + get => throw new PlatformNotSupportedException(); + set => throw new PlatformNotSupportedException(); + } + + #endregion + } +} diff --git a/src/mono/mono/metadata/gc.c b/src/mono/mono/metadata/gc.c index b46971f1a9a7e..5ef0faa9f8051 100644 --- a/src/mono/mono/metadata/gc.c +++ b/src/mono/mono/metadata/gc.c @@ -694,8 +694,8 @@ mono_gc_finalize_notify (void) if (mono_gc_is_null ()) return; -#if defined(HOST_WASI) - // TODO: Schedule the background job on WASI. Threads aren't yet supported in this build. +#if defined(HOST_WASI) && defined(DISABLE_THREADS) + mono_runtime_do_background_work (); #elif defined(HOST_WASM) && defined(DISABLE_THREADS) mono_main_thread_schedule_background_job (mono_runtime_do_background_work); #else From 4e63860fa07dff78ff4b9ec83a16740b65cb25ca Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Wed, 28 Aug 2024 21:12:24 +0200 Subject: [PATCH 2/5] cleanup --- .../Http/WasiHttpHandler/WasiHttpHandler.cs | 86 +++++++++---------- .../Http/WasiHttpHandler/WasiHttpInterop.cs | 3 +- .../Http/WasiHttpHandler/WasiInputStream.cs | 19 ++-- .../Http/WasiHttpHandler/WasiOutputStream.cs | 11 --- 4 files changed, 52 insertions(+), 67 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs index 04d20595c65b1..a1f01637f095f 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs @@ -23,38 +23,35 @@ internal sealed class WasiRequestWrapper : IDisposable { private FutureIncomingResponse? future; // owned by this instance private WasiOutputStream? wasiOutputStream; // owned by this instance - public IncomingResponse? incomingResponse; // owned by this instance - private readonly OutgoingRequest outgoingRequest; // owned by this instance - private readonly HttpRequestMessage request; - private readonly CancellationToken cancellationToken; + private WasiInputStream? incomingStream; // owned by this instance + public Task? requestBodyComplete; + public Task? requestComplete; private bool isDisposed; - public WasiRequestWrapper(HttpRequestMessage request, CancellationToken cancellationToken) + public async Task SendRequestAsync(HttpRequestMessage request, CancellationToken cancellationToken) { if (request.RequestUri is null) { throw new ArgumentException(); } - var requestHeaders = WasiHttpInterop.ConvertRequestHeaders(request); - outgoingRequest = new OutgoingRequest(requestHeaders); // we just passed the Fields ownership to OutgoingRequest - outgoingRequest.SetMethod(WasiHttpInterop.ConvertMethod(request.Method)); - outgoingRequest.SetScheme(WasiHttpInterop.ConvertScheme(request.RequestUri)); - outgoingRequest.SetAuthority(WasiHttpInterop.ConvertAuthority(request.RequestUri)); - outgoingRequest.SetPathWithQuery(request.RequestUri.PathAndQuery); + try + { + var requestHeaders = WasiHttpInterop.ConvertRequestHeaders(request); + var outgoingRequest = new OutgoingRequest(requestHeaders); // passing requestHeaders ownership + outgoingRequest.SetMethod(WasiHttpInterop.ConvertMethod(request.Method)); + outgoingRequest.SetScheme(WasiHttpInterop.ConvertScheme(request.RequestUri)); + outgoingRequest.SetAuthority(WasiHttpInterop.ConvertAuthority(request.RequestUri)); + outgoingRequest.SetPathWithQuery(request.RequestUri.PathAndQuery); - this.request = request; - this.cancellationToken = cancellationToken; - } + requestBodyComplete = SendContent(request.Content, outgoingRequest, cancellationToken); + future = OutgoingHandlerInterop.Handle(outgoingRequest, null); - public async Task SendRequestAsync() - { - try - { - requestBodyComplete = SendContent(); - incomingResponse = await SendRequest().ConfigureAwait(false); + requestComplete = SendRequest(cancellationToken); + + var incomingResponse = await requestComplete.ConfigureAwait(false); ObjectDisposedException.ThrowIf(isDisposed, this); cancellationToken.ThrowIfCancellationRequested(); @@ -67,35 +64,38 @@ public async Task SendRequestAsync() // we will leave scope of this method // we need to pass the ownership of the request and this wrapper to the response (via response content stream) // unless we know that we are not streaming anymore - WasiInputStream incomingStream = new WasiInputStream(this);// passing self ownership + incomingStream = new WasiInputStream(this, incomingResponse);// passing self ownership, passing incomingResponse ownership response.Content = new StreamContent(incomingStream); // passing incomingStream ownership to SendAsync() caller + incomingResponse.Dispose(); + return response; } - catch + catch (WitException e) + { + Dispose(); + throw new HttpRequestException(WasiHttpInterop.ErrorCodeToString((ErrorCode)e.Value), e); + } + catch (Exception) { Dispose(); throw; } } - private async Task SendRequest() + private async Task SendRequest(CancellationToken cancellationToken) { - Console.WriteLine("SendRequestAsync A"); try { - future = OutgoingHandlerInterop.Handle(outgoingRequest, null); - while (true) { - var response = (Result, None>?)future.Get(); + var response = (Result, None>?)future!.Get(); if (response.HasValue) { var result = response.Value.AsOk; if (result.IsOk) { - Console.WriteLine("SendRequestAsync: response is OK"); return result.AsOk; } else @@ -105,7 +105,6 @@ private async Task SendRequest() } else { - Console.WriteLine("SendRequestAsync B"); await WasiHttpInterop.RegisterWasiPollable(future.Subscribe(), cancellationToken).ConfigureAwait(false); } } @@ -118,15 +117,10 @@ private async Task SendRequest() } throw; } - catch (WitException e) - { - throw new HttpRequestException(WasiHttpInterop.ErrorCodeToString((ErrorCode)e.Value), e); - } } - public async Task SendContent() + public async Task SendContent(HttpContent? content, OutgoingRequest outgoingRequest, CancellationToken cancellationToken) { - var content = request.Content; if (content is not null) { wasiOutputStream = new WasiOutputStream(outgoingRequest.Body()); // passing body ownership @@ -135,20 +129,22 @@ public async Task SendContent() } } + ~WasiRequestWrapper() + { + Dispose(); + GC.SuppressFinalize(this); + } + public void Dispose() { if (!isDisposed) { isDisposed = true; - Console.WriteLine("WasiRequestWrapper.Dispose A"); wasiOutputStream?.Dispose(); - Console.WriteLine("WasiRequestWrapper.Dispose B"); - incomingResponse?.Dispose(); - Console.WriteLine("WasiRequestWrapper.Dispose C"); - outgoingRequest.Dispose(); - Console.WriteLine("WasiRequestWrapper.Dispose D"); - future?.Dispose(); - Console.WriteLine("WasiRequestWrapper.Dispose E"); + incomingStream?.Dispose(); + + // TODO why this fails ? + // future?.Dispose(); } } } @@ -164,10 +160,10 @@ internal sealed class WasiHttpHandler : HttpMessageHandler protected internal override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { - var wasiRequest = new WasiRequestWrapper(request, cancellationToken); + var wasiRequest = new WasiRequestWrapper(); try { - return await wasiRequest.SendRequestAsync().ConfigureAwait(false); + return await wasiRequest.SendRequestAsync(request, cancellationToken).ConfigureAwait(false); } catch { diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs index b53ebcd999ffa..93b64c2dd8b73 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs @@ -140,7 +140,7 @@ public static Fields ConvertRequestHeaders(HttpRequestMessage request) public static void ConvertResponseHeaders(IncomingResponse incomingResponse, HttpResponseMessage response) { - using var headers = incomingResponse.Headers(); + var headers = incomingResponse.Headers(); foreach ((var key, var value) in headers.Entries()) { var valueString = Encoding.UTF8.GetString(value); @@ -153,6 +153,7 @@ public static void ConvertResponseHeaders(IncomingResponse incomingResponse, Htt response.Headers.Add(key, valueString); } } + headers.Dispose(); } private static bool IsContentHeader(string headerName) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs index 4993408f172e0..10f4c37579434 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs @@ -28,10 +28,11 @@ internal sealed class WasiInputStream : Stream public override bool CanWrite => false; public override bool CanSeek => false; - public WasiInputStream(WasiRequestWrapper wrapper) + public WasiInputStream(WasiRequestWrapper wrapper, IncomingResponse incomingResponse) { this.wrapper = wrapper; - this.body = wrapper.incomingResponse!.Consume(); + this.body = incomingResponse.Consume(); + incomingResponse.Dispose(); this.stream = body.Stream(); } @@ -42,32 +43,30 @@ public WasiInputStream(WasiRequestWrapper wrapper) public override void Close() { - Console.WriteLine("WasiInputStream.Close " + isClosed); if (!isClosed) { isClosed = true; stream.Dispose(); var futureTrailers = IncomingBody.Finish(body); // we just passed body ownership to Finish futureTrailers.Dispose(); - if (wrapper.requestBodyComplete != null && wrapper.requestBodyComplete.IsCompleted) - { - Console.WriteLine("WasiInputStream.Close WRAPPER"); - wrapper.Dispose(); - } } base.Close(); } protected override void Dispose(bool disposing) { - Console.WriteLine("WasiInputStream.Dispose" + isClosed); if (!isClosed) { isClosed = true; stream.Dispose(); body.Dispose(); } - wrapper.Dispose(); + + if (disposing) + { + // this helps with disposing WIT resources at the Close() time of this stream, instead of waiting for the GC + wrapper.Dispose(); + } base.Dispose(disposing); } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiOutputStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiOutputStream.cs index 22740b85a3a76..5bf3ea3ddc6bf 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiOutputStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiOutputStream.cs @@ -36,35 +36,24 @@ public WasiOutputStream(OutgoingBody body) public override void Close() { - Console.WriteLine("WasiOutputStream.Close " + isClosed); if (!isClosed) { - Console.WriteLine("WasiOutputStream.Close A"); isClosed = true; - Console.WriteLine("WasiOutputStream.Close B"); stream.Dispose(); - Console.WriteLine("WasiOutputStream.Close C"); OutgoingBody.Finish(body, null); - Console.WriteLine("WasiOutputStream.Close D"); } base.Close(); - Console.WriteLine("WasiOutputStream.Close E"); } protected override void Dispose(bool disposing) { - Console.WriteLine("WasiOutputStream.Dispose" + isClosed); if (!isClosed) { - Console.WriteLine("WasiOutputStream.Dispose A"); isClosed = true; stream.Dispose(); - Console.WriteLine("WasiOutputStream.Dispose B"); body.Dispose(); - Console.WriteLine("WasiOutputStream.Dispose C"); } base.Dispose(disposing); - Console.WriteLine("WasiOutputStream.Dispose E"); } public override async Task WriteAsync( From 903c22bfdb5712d64e7a595e52ad9077c15a0e4e Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Wed, 28 Aug 2024 21:21:11 +0200 Subject: [PATCH 3/5] more cleanup --- .../src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs | 6 ++---- .../src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs | 3 +-- .../src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs | 5 ++--- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs index a1f01637f095f..f5d416cbb09cc 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs @@ -51,7 +51,7 @@ public async Task SendRequestAsync(HttpRequestMessage reque requestComplete = SendRequest(cancellationToken); - var incomingResponse = await requestComplete.ConfigureAwait(false); + using var incomingResponse = await requestComplete.ConfigureAwait(false); ObjectDisposedException.ThrowIf(isDisposed, this); cancellationToken.ThrowIfCancellationRequested(); @@ -64,11 +64,9 @@ public async Task SendRequestAsync(HttpRequestMessage reque // we will leave scope of this method // we need to pass the ownership of the request and this wrapper to the response (via response content stream) // unless we know that we are not streaming anymore - incomingStream = new WasiInputStream(this, incomingResponse);// passing self ownership, passing incomingResponse ownership + incomingStream = new WasiInputStream(this, incomingResponse.Consume());// passing self ownership, passing body ownership response.Content = new StreamContent(incomingStream); // passing incomingStream ownership to SendAsync() caller - incomingResponse.Dispose(); - return response; } catch (WitException e) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs index 93b64c2dd8b73..b53ebcd999ffa 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs @@ -140,7 +140,7 @@ public static Fields ConvertRequestHeaders(HttpRequestMessage request) public static void ConvertResponseHeaders(IncomingResponse incomingResponse, HttpResponseMessage response) { - var headers = incomingResponse.Headers(); + using var headers = incomingResponse.Headers(); foreach ((var key, var value) in headers.Entries()) { var valueString = Encoding.UTF8.GetString(value); @@ -153,7 +153,6 @@ public static void ConvertResponseHeaders(IncomingResponse incomingResponse, Htt response.Headers.Add(key, valueString); } } - headers.Dispose(); } private static bool IsContentHeader(string headerName) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs index 10f4c37579434..95c5577e3b28f 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiInputStream.cs @@ -28,11 +28,10 @@ internal sealed class WasiInputStream : Stream public override bool CanWrite => false; public override bool CanSeek => false; - public WasiInputStream(WasiRequestWrapper wrapper, IncomingResponse incomingResponse) + public WasiInputStream(WasiRequestWrapper wrapper, IncomingBody body) { this.wrapper = wrapper; - this.body = incomingResponse.Consume(); - incomingResponse.Dispose(); + this.body = body; this.stream = body.Stream(); } From 5a6734122090b16b35b1dca6062c1a3c60b94149 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Thu, 29 Aug 2024 10:51:25 +0200 Subject: [PATCH 4/5] fix disposal of pollable --- .../src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs | 4 +--- .../src/System/Threading/Wasi/WasiEventLoop.cs | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs index f5d416cbb09cc..f4e7a5925c22c 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs @@ -140,9 +140,7 @@ public void Dispose() isDisposed = true; wasiOutputStream?.Dispose(); incomingStream?.Dispose(); - - // TODO why this fails ? - // future?.Dispose(); + future?.Dispose(); } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Wasi/WasiEventLoop.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Wasi/WasiEventLoop.cs index d52d443bd518d..450bd84898d3a 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Wasi/WasiEventLoop.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Wasi/WasiEventLoop.cs @@ -100,9 +100,9 @@ public void ResolveAndDispose() // no need to unregister the holder from s_pollables, when this is called isDisposed = true; - taskCompletionSource.TrySetResult(); pollable.Dispose(); cancellationTokenRegistration.Dispose(); + taskCompletionSource.TrySetResult(); } // for GC of abandoned Tasks or for cancellation @@ -116,9 +116,9 @@ private static void CancelAndDispose(object? s) // it will be removed from s_pollables on the next run self.isDisposed = true; - self.taskCompletionSource.TrySetCanceled(self.cancellationToken); self.pollable.Dispose(); self.cancellationTokenRegistration.Dispose(); + self.taskCompletionSource.TrySetCanceled(self.cancellationToken); } } } From cbfb6b545dbf9b73157c03313bab7dd4a7edd3a2 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Thu, 29 Aug 2024 12:51:33 +0200 Subject: [PATCH 5/5] feedback --- .../Http/WasiHttpHandler/WasiHttpHandler.cs | 17 +++++-------- .../Http/WasiHttpHandler/WasiHttpInterop.cs | 25 ++++++++++++++++++- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs index f4e7a5925c22c..aeebaa718498d 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpHandler.cs @@ -21,12 +21,9 @@ namespace System.Net.Http // on top of https://github.com/WebAssembly/wasi-http/blob/main/wit/types.wit internal sealed class WasiRequestWrapper : IDisposable { + private WasiOutputStream? wasiOutputStream; // disposal could be made earlier from this instance's Dispose + private WasiInputStream? incomingStream; // disposal could be made earlier from this instance's Dispose private FutureIncomingResponse? future; // owned by this instance - private WasiOutputStream? wasiOutputStream; // owned by this instance - private WasiInputStream? incomingStream; // owned by this instance - - public Task? requestBodyComplete; - public Task? requestComplete; private bool isDisposed; public async Task SendRequestAsync(HttpRequestMessage request, CancellationToken cancellationToken) @@ -45,13 +42,13 @@ public async Task SendRequestAsync(HttpRequestMessage reque outgoingRequest.SetAuthority(WasiHttpInterop.ConvertAuthority(request.RequestUri)); outgoingRequest.SetPathWithQuery(request.RequestUri.PathAndQuery); - requestBodyComplete = SendContent(request.Content, outgoingRequest, cancellationToken); +#pragma warning disable CS4014 // intentionaly not awaited + SendContent(request.Content, outgoingRequest, cancellationToken); +#pragma warning restore CS4014 future = OutgoingHandlerInterop.Handle(outgoingRequest, null); - requestComplete = SendRequest(cancellationToken); - - using var incomingResponse = await requestComplete.ConfigureAwait(false); + using var incomingResponse = await SendRequest(cancellationToken).ConfigureAwait(false); ObjectDisposedException.ThrowIf(isDisposed, this); cancellationToken.ThrowIfCancellationRequested(); @@ -71,12 +68,10 @@ public async Task SendRequestAsync(HttpRequestMessage reque } catch (WitException e) { - Dispose(); throw new HttpRequestException(WasiHttpInterop.ErrorCodeToString((ErrorCode)e.Value), e); } catch (Exception) { - Dispose(); throw; } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs index b53ebcd999ffa..d9dcee595cb24 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/WasiHttpHandler/WasiHttpInterop.cs @@ -135,7 +135,30 @@ public static Fields ConvertRequestHeaders(HttpRequestMessage request) } } } - return Fields.FromList(headers); + try + { + return Fields.FromList(headers); + } + catch (WitException e) + { + var error = HeaderErrorToString((HeaderError)e.Value); + throw new HttpRequestException($"Header validation error: {error}"); + } + } + + private static string HeaderErrorToString(HeaderError error) + { + switch (error.Tag) + { + case ITypes.HeaderError.INVALID_SYNTAX: + return "INVALID_SYNTAX"; + case ITypes.HeaderError.FORBIDDEN: + return "FORBIDDEN"; + case ITypes.HeaderError.IMMUTABLE: + return "IMMUTABLE"; + default: + return $"{error.Tag}"; + } } public static void ConvertResponseHeaders(IncomingResponse incomingResponse, HttpResponseMessage response)