From 0a75474cfc74cbe77031309a8b9af1ae76d0a2da Mon Sep 17 00:00:00 2001 From: Martin Baulig Date: Wed, 28 Feb 2018 03:37:24 -0500 Subject: [PATCH] [System]: Use new `WebCompletionSource` instead of `TaskCompletionSource`. (#7293) This new helper class uses `TaskCompletionSource<>` internally, but instead of using TrySetException(), it sets a result containing an `ExceptionDispatchInfo`. The problem with using TrySetException() is that it wraps the exception object in an `AggregateException`, that would be unobserved if the caller throws the original exception. --- mcs/class/System/System.Net/HttpWebRequest.cs | 21 ++-- .../System/System.Net/WebCompletionSource.cs | 103 ++++++++++++++++++ .../System/System.Net/WebRequestStream.cs | 19 ++-- .../System/System.Net/WebResponseStream.cs | 39 +++---- mcs/class/System/common_networking.sources | 1 + 5 files changed, 145 insertions(+), 38 deletions(-) create mode 100644 mcs/class/System/System.Net/WebCompletionSource.cs diff --git a/mcs/class/System/System.Net/HttpWebRequest.cs b/mcs/class/System/System.Net/HttpWebRequest.cs index e54736439e20..8102997df03b 100644 --- a/mcs/class/System/System.Net/HttpWebRequest.cs +++ b/mcs/class/System/System.Net/HttpWebRequest.cs @@ -96,7 +96,7 @@ public class HttpWebRequest : WebRequest, ISerializable WebRequestStream writeStream; HttpWebResponse webResponse; - TaskCompletionSource responseTask; + WebCompletionSource responseTask; WebOperation currentOperation; int aborted; bool gotRequestStream; @@ -968,15 +968,16 @@ async Task MyGetResponseAsync (CancellationToken cancellationTo if (!sendChunked && transferEncoding != null && transferEncoding.Trim () != "") throw new ProtocolViolationException ("SendChunked should be true."); - var myTcs = new TaskCompletionSource (); + var completion = new WebCompletionSource (); WebOperation operation; lock (locker) { getResponseCalled = true; - var oldTcs = Interlocked.CompareExchange (ref responseTask, myTcs, null); - WebConnection.Debug ($"HWR GET RESPONSE: Req={ID} {oldTcs != null}"); - if (oldTcs != null) { - if (haveResponse && oldTcs.Task.IsCompleted) - return oldTcs.Task.Result; + var oldCompletion = Interlocked.CompareExchange (ref responseTask, completion, null); + WebConnection.Debug ($"HWR GET RESPONSE: Req={ID} {oldCompletion != null}"); + if (oldCompletion != null) { + oldCompletion.ThrowOnError (); + if (haveResponse && oldCompletion.IsCompleted) + return webResponse; throw new InvalidOperationException ("Cannot re-call start of asynchronous " + "method while a previous call is still in progress."); } @@ -1023,14 +1024,14 @@ async Task MyGetResponseAsync (CancellationToken cancellationTo if (throwMe != null) { WebConnection.Debug ($"HWR GET RESPONSE LOOP #1 EX: Req={ID} {throwMe.Status} {throwMe.InnerException?.GetType ()}"); haveResponse = true; - myTcs.TrySetException (throwMe); + completion.TrySetException (throwMe); throw throwMe; } if (!redirect) { haveResponse = true; webResponse = response; - myTcs.TrySetResult (response); + completion.TrySetCompleted (); return response; } @@ -1056,7 +1057,7 @@ async Task MyGetResponseAsync (CancellationToken cancellationTo WebConnection.Debug ($"HWR GET RESPONSE LOOP #3 EX: Req={ID} {throwMe.Status} {throwMe.InnerException?.GetType ()}"); haveResponse = true; stream?.Close (); - myTcs.TrySetException (throwMe); + completion.TrySetException (throwMe); throw throwMe; } diff --git a/mcs/class/System/System.Net/WebCompletionSource.cs b/mcs/class/System/System.Net/WebCompletionSource.cs new file mode 100644 index 000000000000..f90dbf50e8da --- /dev/null +++ b/mcs/class/System/System.Net/WebCompletionSource.cs @@ -0,0 +1,103 @@ +// +// WebCompletionSource.cs +// +// Author: +// Martin Baulig +// +// Copyright (c) 2018 Xamarin Inc. (http://www.xamarin.com) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Runtime.ExceptionServices; + +namespace System.Net +{ + class WebCompletionSource + { + TaskCompletionSource completion; + + public WebCompletionSource () + { + completion = new TaskCompletionSource (); + } + + public bool TrySetCompleted () + { + return completion.TrySetResult (new Result (State.Completed, null)); + } + + public bool TrySetCanceled () + { + var error = new OperationCanceledException (); + var result = new Result (State.Canceled, ExceptionDispatchInfo.Capture (error)); + return completion.TrySetResult (result); + } + + public bool TrySetException (Exception error) + { + var result = new Result (State.Faulted, ExceptionDispatchInfo.Capture (error)); + return completion.TrySetResult (result); + } + + public bool IsCompleted => completion.Task.IsCompleted; + + public void ThrowOnError () + { + if (!completion.Task.IsCompleted) + return; + completion.Task.Result.Error?.Throw (); + } + + public async Task WaitForCompletion (bool throwOnError) + { + var result = await completion.Task.ConfigureAwait (false); + if (result.State == State.Completed) + return true; + if (throwOnError) + result.Error.Throw (); + return false; + } + + enum State : int { + Running, + Completed, + Canceled, + Faulted + } + + class Result + { + public State State { + get; + } + + public ExceptionDispatchInfo Error { + get; + } + + public Result (State state, ExceptionDispatchInfo error) + { + State = state; + Error = error; + } + } + } +} diff --git a/mcs/class/System/System.Net/WebRequestStream.cs b/mcs/class/System/System.Net/WebRequestStream.cs index c0cf5829764f..5077ac56cd00 100644 --- a/mcs/class/System/System.Net/WebRequestStream.cs +++ b/mcs/class/System/System.Net/WebRequestStream.cs @@ -39,7 +39,7 @@ class WebRequestStream : WebConnectionStream bool requestWritten; bool allowBuffering; bool sendChunked; - TaskCompletionSource pendingWrite; + WebCompletionSource pendingWrite; long totalWritten; byte[] headers; bool headersSent; @@ -149,8 +149,8 @@ public override async Task WriteAsync (byte[] buffer, int offset, int size, Canc if (size < 0 || (length - offset) < size) throw new ArgumentOutOfRangeException (nameof (size)); - var myWriteTcs = new TaskCompletionSource (); - if (Interlocked.CompareExchange (ref pendingWrite, myWriteTcs, null) != null) + var completion = new WebCompletionSource (); + if (Interlocked.CompareExchange (ref pendingWrite, completion, null) != null) throw new InvalidOperationException (SR.GetString (SR.net_repcall)); try { @@ -162,7 +162,7 @@ public override async Task WriteAsync (byte[] buffer, int offset, int size, Canc await FinishWriting (cancellationToken); pendingWrite = null; - myWriteTcs.TrySetResult (0); + completion.TrySetCompleted (); } catch (Exception ex) { KillBuffer (); closed = true; @@ -175,7 +175,7 @@ public override async Task WriteAsync (byte[] buffer, int offset, int size, Canc Operation.CompleteRequestWritten (this, ex); pendingWrite = null; - myWriteTcs.TrySetException (ex); + completion.TrySetException (ex); throw; } } @@ -364,11 +364,12 @@ async Task WriteChunkTrailer () cts.CancelAfter (WriteTimeout); var timeoutTask = Task.Delay (WriteTimeout); while (true) { - var myWriteTcs = new TaskCompletionSource (); - var oldTcs = Interlocked.CompareExchange (ref pendingWrite, myWriteTcs, null); - if (oldTcs == null) + var completion = new WebCompletionSource (); + var oldCompletion = Interlocked.CompareExchange (ref pendingWrite, completion, null); + if (oldCompletion == null) break; - var ret = await Task.WhenAny (timeoutTask, oldTcs.Task).ConfigureAwait (false); + var oldWriteTask = oldCompletion.WaitForCompletion (true); + var ret = await Task.WhenAny (timeoutTask, oldWriteTask).ConfigureAwait (false); if (ret == timeoutTask) throw new WebException ("The operation has timed out.", WebExceptionStatus.Timeout); } diff --git a/mcs/class/System/System.Net/WebResponseStream.cs b/mcs/class/System/System.Net/WebResponseStream.cs index 75ec4b7d6c3b..d6fce9fe5d82 100644 --- a/mcs/class/System/System.Net/WebResponseStream.cs +++ b/mcs/class/System/System.Net/WebResponseStream.cs @@ -41,7 +41,7 @@ class WebResponseStream : WebConnectionStream long totalRead; bool nextReadCalled; int stream_length; // -1 when CL not present - TaskCompletionSource readTcs; + WebCompletionSource pendingRead; object locker = new object (); int nestedRead; bool read_eof; @@ -126,16 +126,16 @@ public override async Task ReadAsync (byte[] buffer, int offset, int size, if (Interlocked.CompareExchange (ref nestedRead, 1, 0) != 0) throw new InvalidOperationException ("Invalid nested call."); - var myReadTcs = new TaskCompletionSource (); + var completion = new WebCompletionSource (); while (!cancellationToken.IsCancellationRequested) { /* - * 'readTcs' is set by ReadAllAsync(). + * 'currentRead' is set by ReadAllAsync(). */ - var oldReadTcs = Interlocked.CompareExchange (ref readTcs, myReadTcs, null); - WebConnection.Debug ($"{ME} READ ASYNC #1: {oldReadTcs != null}"); - if (oldReadTcs == null) + var oldCompletion = Interlocked.CompareExchange (ref pendingRead, completion, null); + WebConnection.Debug ($"{ME} READ ASYNC #1: {oldCompletion != null}"); + if (oldCompletion == null) break; - await oldReadTcs.Task.ConfigureAwait (false); + await oldCompletion.WaitForCompletion (true).ConfigureAwait (false); } WebConnection.Debug ($"{ME} READ ASYNC #2: {totalRead} {contentLength}"); @@ -159,8 +159,8 @@ public override async Task ReadAsync (byte[] buffer, int offset, int size, if (throwMe != null) { lock (locker) { - myReadTcs.TrySetException (throwMe); - readTcs = null; + completion.TrySetException (throwMe); + pendingRead = null; nestedRead = 0; } @@ -170,8 +170,8 @@ public override async Task ReadAsync (byte[] buffer, int offset, int size, } lock (locker) { - readTcs.TrySetResult (oldBytes + nbytes); - readTcs = null; + pendingRead.TrySetCompleted (); + pendingRead = null; nestedRead = 0; } @@ -406,18 +406,19 @@ internal async Task ReadAllAsync (bool resending, CancellationToken cancellation } var timeoutTask = Task.Delay (ReadTimeout); - var myReadTcs = new TaskCompletionSource (); + var completion = new WebCompletionSource (); while (true) { /* - * 'readTcs' is set by ReadAsync(). + * 'currentRead' is set by ReadAsync(). */ cancellationToken.ThrowIfCancellationRequested (); - var oldReadTcs = Interlocked.CompareExchange (ref readTcs, myReadTcs, null); - if (oldReadTcs == null) + var oldCompletion = Interlocked.CompareExchange (ref pendingRead, completion, null); + if (oldCompletion == null) break; // ReadAsync() is in progress. - var anyTask = await Task.WhenAny (oldReadTcs.Task, timeoutTask).ConfigureAwait (false); + var oldReadTask = oldCompletion.WaitForCompletion (true); + var anyTask = await Task.WhenAny (oldReadTask, timeoutTask).ConfigureAwait (false); if (anyTask == timeoutTask) throw new WebException ("The operation has timed out.", WebExceptionStatus.Timeout); } @@ -496,14 +497,14 @@ internal async Task ReadAllAsync (bool resending, CancellationToken cancellation readBuffer = new BufferOffsetSize (b, 0, new_size, false); totalRead = 0; nextReadCalled = true; - myReadTcs.TrySetResult (new_size); + completion.TrySetCompleted (); } catch (Exception ex) { WebConnection.Debug ($"{ME} READ ALL ASYNC EX: {ex.Message}"); - myReadTcs.TrySetException (ex); + completion.TrySetException (ex); throw; } finally { WebConnection.Debug ($"{ME} READ ALL ASYNC #2"); - readTcs = null; + pendingRead = null; } Operation.CompleteResponseRead (true); diff --git a/mcs/class/System/common_networking.sources b/mcs/class/System/common_networking.sources index 3767fc1c1044..1327ddfb3c14 100644 --- a/mcs/class/System/common_networking.sources +++ b/mcs/class/System/common_networking.sources @@ -43,6 +43,7 @@ System.Net/ServicePoint.cs System.Net/ServicePointManager.cs System.Net/ServicePointManager.extra.cs System.Net/ServicePointScheduler.cs +System.Net/WebCompletionSource.cs System.Net/WebConnection.cs System.Net/WebConnectionStream.cs System.Net/WebConnectionTunnel.cs