Skip to content

Commit

Permalink
[System]: Use new WebCompletionSource instead of `TaskCompletionSou…
Browse files Browse the repository at this point in the history
…rce`. (mono#7293)

This new helper class uses `TaskCompletionSource<>` internally, but instead
of using TrySetException(), it sets a result containing an `ExceptionDispatchInfo`.

The problem with using TrySetException() is that it wraps the exception
object in an `AggregateException`, that would be unobserved if the caller
throws the original exception.
  • Loading branch information
Martin Baulig authored and UnityAlex committed Aug 29, 2019
1 parent c847b46 commit 0a75474
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 38 deletions.
21 changes: 11 additions & 10 deletions mcs/class/System/System.Net/HttpWebRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class HttpWebRequest : WebRequest, ISerializable

WebRequestStream writeStream;
HttpWebResponse webResponse;
TaskCompletionSource<HttpWebResponse> responseTask;
WebCompletionSource responseTask;
WebOperation currentOperation;
int aborted;
bool gotRequestStream;
Expand Down Expand Up @@ -968,15 +968,16 @@ async Task<HttpWebResponse> MyGetResponseAsync (CancellationToken cancellationTo
if (!sendChunked && transferEncoding != null && transferEncoding.Trim () != "")
throw new ProtocolViolationException ("SendChunked should be true.");

var myTcs = new TaskCompletionSource<HttpWebResponse> ();
var completion = new WebCompletionSource ();
WebOperation operation;
lock (locker) {
getResponseCalled = true;
var oldTcs = Interlocked.CompareExchange (ref responseTask, myTcs, null);
WebConnection.Debug ($"HWR GET RESPONSE: Req={ID} {oldTcs != null}");
if (oldTcs != null) {
if (haveResponse && oldTcs.Task.IsCompleted)
return oldTcs.Task.Result;
var oldCompletion = Interlocked.CompareExchange (ref responseTask, completion, null);
WebConnection.Debug ($"HWR GET RESPONSE: Req={ID} {oldCompletion != null}");
if (oldCompletion != null) {
oldCompletion.ThrowOnError ();
if (haveResponse && oldCompletion.IsCompleted)
return webResponse;
throw new InvalidOperationException ("Cannot re-call start of asynchronous " +
"method while a previous call is still in progress.");
}
Expand Down Expand Up @@ -1023,14 +1024,14 @@ async Task<HttpWebResponse> MyGetResponseAsync (CancellationToken cancellationTo
if (throwMe != null) {
WebConnection.Debug ($"HWR GET RESPONSE LOOP #1 EX: Req={ID} {throwMe.Status} {throwMe.InnerException?.GetType ()}");
haveResponse = true;
myTcs.TrySetException (throwMe);
completion.TrySetException (throwMe);
throw throwMe;
}

if (!redirect) {
haveResponse = true;
webResponse = response;
myTcs.TrySetResult (response);
completion.TrySetCompleted ();
return response;
}

Expand All @@ -1056,7 +1057,7 @@ async Task<HttpWebResponse> MyGetResponseAsync (CancellationToken cancellationTo
WebConnection.Debug ($"HWR GET RESPONSE LOOP #3 EX: Req={ID} {throwMe.Status} {throwMe.InnerException?.GetType ()}");
haveResponse = true;
stream?.Close ();
myTcs.TrySetException (throwMe);
completion.TrySetException (throwMe);
throw throwMe;
}

Expand Down
103 changes: 103 additions & 0 deletions mcs/class/System/System.Net/WebCompletionSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//
// WebCompletionSource.cs
//
// Author:
// Martin Baulig <[email protected]>
//
// Copyright (c) 2018 Xamarin Inc. (http://www.xamarin.com)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.ExceptionServices;

namespace System.Net
{
class WebCompletionSource
{
TaskCompletionSource<Result> completion;

public WebCompletionSource ()
{
completion = new TaskCompletionSource<Result> ();
}

public bool TrySetCompleted ()
{
return completion.TrySetResult (new Result (State.Completed, null));
}

public bool TrySetCanceled ()
{
var error = new OperationCanceledException ();
var result = new Result (State.Canceled, ExceptionDispatchInfo.Capture (error));
return completion.TrySetResult (result);
}

public bool TrySetException (Exception error)
{
var result = new Result (State.Faulted, ExceptionDispatchInfo.Capture (error));
return completion.TrySetResult (result);
}

public bool IsCompleted => completion.Task.IsCompleted;

public void ThrowOnError ()
{
if (!completion.Task.IsCompleted)
return;
completion.Task.Result.Error?.Throw ();
}

public async Task<bool> WaitForCompletion (bool throwOnError)
{
var result = await completion.Task.ConfigureAwait (false);
if (result.State == State.Completed)
return true;
if (throwOnError)
result.Error.Throw ();
return false;
}

enum State : int {
Running,
Completed,
Canceled,
Faulted
}

class Result
{
public State State {
get;
}

public ExceptionDispatchInfo Error {
get;
}

public Result (State state, ExceptionDispatchInfo error)
{
State = state;
Error = error;
}
}
}
}
19 changes: 10 additions & 9 deletions mcs/class/System/System.Net/WebRequestStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class WebRequestStream : WebConnectionStream
bool requestWritten;
bool allowBuffering;
bool sendChunked;
TaskCompletionSource<int> pendingWrite;
WebCompletionSource pendingWrite;
long totalWritten;
byte[] headers;
bool headersSent;
Expand Down Expand Up @@ -149,8 +149,8 @@ public override async Task WriteAsync (byte[] buffer, int offset, int size, Canc
if (size < 0 || (length - offset) < size)
throw new ArgumentOutOfRangeException (nameof (size));

var myWriteTcs = new TaskCompletionSource<int> ();
if (Interlocked.CompareExchange (ref pendingWrite, myWriteTcs, null) != null)
var completion = new WebCompletionSource ();
if (Interlocked.CompareExchange (ref pendingWrite, completion, null) != null)
throw new InvalidOperationException (SR.GetString (SR.net_repcall));

try {
Expand All @@ -162,7 +162,7 @@ public override async Task WriteAsync (byte[] buffer, int offset, int size, Canc
await FinishWriting (cancellationToken);

pendingWrite = null;
myWriteTcs.TrySetResult (0);
completion.TrySetCompleted ();
} catch (Exception ex) {
KillBuffer ();
closed = true;
Expand All @@ -175,7 +175,7 @@ public override async Task WriteAsync (byte[] buffer, int offset, int size, Canc
Operation.CompleteRequestWritten (this, ex);

pendingWrite = null;
myWriteTcs.TrySetException (ex);
completion.TrySetException (ex);
throw;
}
}
Expand Down Expand Up @@ -364,11 +364,12 @@ async Task WriteChunkTrailer ()
cts.CancelAfter (WriteTimeout);
var timeoutTask = Task.Delay (WriteTimeout);
while (true) {
var myWriteTcs = new TaskCompletionSource<int> ();
var oldTcs = Interlocked.CompareExchange (ref pendingWrite, myWriteTcs, null);
if (oldTcs == null)
var completion = new WebCompletionSource ();
var oldCompletion = Interlocked.CompareExchange (ref pendingWrite, completion, null);
if (oldCompletion == null)
break;
var ret = await Task.WhenAny (timeoutTask, oldTcs.Task).ConfigureAwait (false);
var oldWriteTask = oldCompletion.WaitForCompletion (true);
var ret = await Task.WhenAny (timeoutTask, oldWriteTask).ConfigureAwait (false);
if (ret == timeoutTask)
throw new WebException ("The operation has timed out.", WebExceptionStatus.Timeout);
}
Expand Down
39 changes: 20 additions & 19 deletions mcs/class/System/System.Net/WebResponseStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class WebResponseStream : WebConnectionStream
long totalRead;
bool nextReadCalled;
int stream_length; // -1 when CL not present
TaskCompletionSource<int> readTcs;
WebCompletionSource pendingRead;
object locker = new object ();
int nestedRead;
bool read_eof;
Expand Down Expand Up @@ -126,16 +126,16 @@ public override async Task<int> ReadAsync (byte[] buffer, int offset, int size,
if (Interlocked.CompareExchange (ref nestedRead, 1, 0) != 0)
throw new InvalidOperationException ("Invalid nested call.");

var myReadTcs = new TaskCompletionSource<int> ();
var completion = new WebCompletionSource ();
while (!cancellationToken.IsCancellationRequested) {
/*
* 'readTcs' is set by ReadAllAsync().
* 'currentRead' is set by ReadAllAsync().
*/
var oldReadTcs = Interlocked.CompareExchange (ref readTcs, myReadTcs, null);
WebConnection.Debug ($"{ME} READ ASYNC #1: {oldReadTcs != null}");
if (oldReadTcs == null)
var oldCompletion = Interlocked.CompareExchange (ref pendingRead, completion, null);
WebConnection.Debug ($"{ME} READ ASYNC #1: {oldCompletion != null}");
if (oldCompletion == null)
break;
await oldReadTcs.Task.ConfigureAwait (false);
await oldCompletion.WaitForCompletion (true).ConfigureAwait (false);
}

WebConnection.Debug ($"{ME} READ ASYNC #2: {totalRead} {contentLength}");
Expand All @@ -159,8 +159,8 @@ public override async Task<int> ReadAsync (byte[] buffer, int offset, int size,

if (throwMe != null) {
lock (locker) {
myReadTcs.TrySetException (throwMe);
readTcs = null;
completion.TrySetException (throwMe);
pendingRead = null;
nestedRead = 0;
}

Expand All @@ -170,8 +170,8 @@ public override async Task<int> ReadAsync (byte[] buffer, int offset, int size,
}

lock (locker) {
readTcs.TrySetResult (oldBytes + nbytes);
readTcs = null;
pendingRead.TrySetCompleted ();
pendingRead = null;
nestedRead = 0;
}

Expand Down Expand Up @@ -406,18 +406,19 @@ internal async Task ReadAllAsync (bool resending, CancellationToken cancellation
}

var timeoutTask = Task.Delay (ReadTimeout);
var myReadTcs = new TaskCompletionSource<int> ();
var completion = new WebCompletionSource ();
while (true) {
/*
* 'readTcs' is set by ReadAsync().
* 'currentRead' is set by ReadAsync().
*/
cancellationToken.ThrowIfCancellationRequested ();
var oldReadTcs = Interlocked.CompareExchange (ref readTcs, myReadTcs, null);
if (oldReadTcs == null)
var oldCompletion = Interlocked.CompareExchange (ref pendingRead, completion, null);
if (oldCompletion == null)
break;

// ReadAsync() is in progress.
var anyTask = await Task.WhenAny (oldReadTcs.Task, timeoutTask).ConfigureAwait (false);
var oldReadTask = oldCompletion.WaitForCompletion (true);
var anyTask = await Task.WhenAny (oldReadTask, timeoutTask).ConfigureAwait (false);
if (anyTask == timeoutTask)
throw new WebException ("The operation has timed out.", WebExceptionStatus.Timeout);
}
Expand Down Expand Up @@ -496,14 +497,14 @@ internal async Task ReadAllAsync (bool resending, CancellationToken cancellation
readBuffer = new BufferOffsetSize (b, 0, new_size, false);
totalRead = 0;
nextReadCalled = true;
myReadTcs.TrySetResult (new_size);
completion.TrySetCompleted ();
} catch (Exception ex) {
WebConnection.Debug ($"{ME} READ ALL ASYNC EX: {ex.Message}");
myReadTcs.TrySetException (ex);
completion.TrySetException (ex);
throw;
} finally {
WebConnection.Debug ($"{ME} READ ALL ASYNC #2");
readTcs = null;
pendingRead = null;
}

Operation.CompleteResponseRead (true);
Expand Down
1 change: 1 addition & 0 deletions mcs/class/System/common_networking.sources
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ System.Net/ServicePoint.cs
System.Net/ServicePointManager.cs
System.Net/ServicePointManager.extra.cs
System.Net/ServicePointScheduler.cs
System.Net/WebCompletionSource.cs
System.Net/WebConnection.cs
System.Net/WebConnectionStream.cs
System.Net/WebConnectionTunnel.cs
Expand Down

0 comments on commit 0a75474

Please sign in to comment.