Skip to content

Commit

Permalink
Merge pull request dotnet#427 from shhsu/master
Browse files Browse the repository at this point in the history
Allow caller to await on TimeoutHelper.CancelationToken
  • Loading branch information
Peter Hsu committed Oct 26, 2015
2 parents a960d98 + bd8cdd1 commit 24e6cdd
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,69 +8,54 @@

namespace System.Runtime
{
public struct TimeoutHelper : IDisposable
public struct TimeoutHelper
{
public static readonly TimeSpan MaxWait = TimeSpan.FromMilliseconds(Int32.MaxValue);
private static readonly CancellationToken s_precancelledToken = new CancellationToken(true);

private bool _cancellationTokenInitialized;
private bool _deadlineSet;

private CancellationToken _cancellationToken;
private CancellationTokenSource _cts;
private DateTime _deadline;
private TimeSpan _originalTimeout;
public static readonly TimeSpan MaxWait = TimeSpan.FromMilliseconds(Int32.MaxValue);
private static Action<object> s_cancelOnTimeout = state => ((TimeoutHelper)state)._cts.Cancel();

public TimeoutHelper(TimeSpan timeout)
{
Contract.Assert(timeout >= TimeSpan.Zero, "timeout must be non-negative");

_cancellationTokenInitialized = false;
_cts = null;
_originalTimeout = timeout;
_deadline = DateTime.MaxValue;
_deadlineSet = (timeout == TimeSpan.MaxValue);
}

// No locks as we expect this class to be used linearly.
// If another CancellationTokenSource is created, we might have a CancellationToken outstanding
// that isn't cancelled if _cts.Cancel() is called. This happens only on the Abort paths, so it's not an issue.
private void InitializeCancellationToken(TimeSpan timeout)
public CancellationToken GetCancellationToken()
{
if (timeout == TimeSpan.MaxValue || timeout == Timeout.InfiniteTimeSpan)
{
_cancellationToken = CancellationToken.None;
}
else if (timeout > TimeSpan.Zero)
{
_cts = new CancellationTokenSource();
_cancellationToken = _cts.Token;
TimeoutTokenSource.FromTimeout((int)timeout.TotalMilliseconds).Register(s_cancelOnTimeout, this);
}
else
{
_cancellationToken = new CancellationToken(true);
}
_cancellationTokenInitialized = true;
return GetCancellationTokenAsync().Result;
}

public CancellationToken CancellationToken
public async Task<CancellationToken> GetCancellationTokenAsync()
{
get
if (!_cancellationTokenInitialized)
{
if (!_cancellationTokenInitialized)
var timeout = RemainingTime();
if (timeout >= MaxWait || timeout == Timeout.InfiniteTimeSpan)
{
_cancellationToken = CancellationToken.None;
}
else if (timeout > TimeSpan.Zero)
{
InitializeCancellationToken(this.RemainingTime());
_cancellationToken = await TimeoutTokenSource.FromTimeoutAsync((int)timeout.TotalMilliseconds);
}
return _cancellationToken;
else
{
_cancellationToken = s_precancelledToken;
}
_cancellationTokenInitialized = true;
}
}

public void CancelCancellationToken(bool throwOnFirstException = false)
{
if (_cts != null)
{
_cts.Cancel(throwOnFirstException);
}
return _cancellationToken;
}

public TimeSpan OriginalTimeout
Expand Down Expand Up @@ -194,16 +179,6 @@ private void SetDeadline()
_deadlineSet = true;
}

public void Dispose()
{
if (_cancellationTokenInitialized && _cts !=null)
{
_cts.Dispose();
_cancellationTokenInitialized = false;
_cancellationToken = default(CancellationToken);
}
}

public static void ThrowIfNegativeArgument(TimeSpan timeout)
{
ThrowIfNegativeArgument(timeout, "timeout");
Expand Down Expand Up @@ -260,9 +235,29 @@ internal static TimeoutException CreateEnterTimedOutException(TimeSpan timeout)
/// </summary>
internal static class TimeoutTokenSource
{
private const int COALESCING_SPAN_MS = 15;
/// <summary>
/// These are constants use to calculate timeout coalescing, for more description see method FromTimeoutAsync
/// </summary>
private const int CoalescingFactor = 15;
private const int GranularityFactor = 2000;
private const int SegmentationFactor = CoalescingFactor * GranularityFactor;

private static readonly ConcurrentDictionary<long, Task<CancellationToken>> s_tokenCache =
new ConcurrentDictionary<long, Task<CancellationToken>>();
private static readonly Action<object> s_deregisterToken = (object state) =>
{
var args = (Tuple<long, CancellationTokenSource>)state;
Task<CancellationToken> ignored;
try
{
s_tokenCache.TryRemove(args.Item1, out ignored);
}
finally
{
args.Item2.Dispose();
}
};

public static CancellationToken FromTimeout(int millisecondsTimeout)
{
Expand All @@ -278,10 +273,25 @@ public static Task<CancellationToken> FromTimeoutAsync(int millisecondsTimeout)
throw new ArgumentOutOfRangeException("Invalid millisecondsTimeout value " + millisecondsTimeout);
}


// To prevent s_tokenCache growing too large, we have to adjust the granularity of the our coalesce depending
// on the value of millisecondsTimeout. The coalescing span scales proportionally with millisecondsTimeout which
// would garentee constant s_tokenCache size in the case where similar millisecondsTimeout values are accepted.
// If the method is given a wildly different millisecondsTimeout values all the time, the dictionary would still
// only grow logarithmically with respect to the range of the input values
uint currentTime = (uint)Environment.TickCount;
long targetTime = millisecondsTimeout + currentTime;
// round the targetTime up to the next closest 15ms
targetTime = ((targetTime + (COALESCING_SPAN_MS - 1)) / COALESCING_SPAN_MS) * COALESCING_SPAN_MS;

// Formula for our coalescing span:
// Divide millisecondsTimeout by SegmentationFactor and take the highest bit and then multiply CoalescingFactor back
var segmentValue = millisecondsTimeout / SegmentationFactor;
var coalescingSpanMs = CoalescingFactor;
while (segmentValue > 0)
{
segmentValue >>= 1;
coalescingSpanMs <<= 1;
}
targetTime = ((targetTime + (coalescingSpanMs - 1)) / coalescingSpanMs) * coalescingSpanMs;

Task<CancellationToken> tokenTask;

Expand All @@ -294,13 +304,11 @@ public static Task<CancellationToken> FromTimeoutAsync(int millisecondsTimeout)
{
// Since this thread was successful reserving a spot in the cache, it would be the only thread
// that construct the CancellationTokenSource
var token = new CancellationTokenSource((int)(targetTime - currentTime)).Token;
var tokenSource = new CancellationTokenSource((int)(targetTime - currentTime));
var token = tokenSource.Token;

// Clean up cache when Token is canceled
token.Register(t => {
Task<CancellationToken> ignored;
s_tokenCache.TryRemove((long)t, out ignored);
}, targetTime);
token.Register(s_deregisterToken, Tuple.Create(targetTime, tokenSource));

// set the result so other thread may observe the token, and return
tcs.TrySetResult(token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public override async Task<WebSocket> CreateWebSocketAsync(Uri address, WebHeade
webSocket.Options.SetRequestHeader(header, headers[header]);
}

await webSocket.ConnectAsync(address, timeoutHelper.CancellationToken);
var cancelToken = await timeoutHelper.GetCancellationTokenAsync();
await webSocket.ConnectAsync(address, cancelToken);
return webSocket;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,18 @@ internal virtual void OnHttpRequestCompleted(HttpRequestMessage request)

internal class HttpClientChannelAsyncRequest : IAsyncRequest
{
private static readonly Action<object> s_cancelCts = state =>
{
try
{
((CancellationTokenSource)state).Cancel();
}
catch (ObjectDisposedException)
{
// ignore
}
};
private HttpClientRequestChannel _channel;
private HttpChannelFactory<IRequestChannel> _factory;
private EndpointAddress _to;
Expand All @@ -627,6 +639,7 @@ internal class HttpClientChannelAsyncRequest : IAsyncRequest
private TimeoutHelper _timeoutHelper;
private int _httpRequestCompleted;
private HttpClient _httpClient;
private readonly CancellationTokenSource _httpSendCts;

public HttpClientChannelAsyncRequest(HttpClientRequestChannel channel)
{
Expand All @@ -635,6 +648,7 @@ public HttpClientChannelAsyncRequest(HttpClientRequestChannel channel)
_via = channel.Via;
_factory = channel.Factory;
_httpClient = _factory.GetHttpClient();
_httpSendCts = new CancellationTokenSource();
}

public async Task SendRequestAsync(Message message, TimeoutHelper timeoutHelper)
Expand Down Expand Up @@ -673,9 +687,13 @@ public async Task SendRequestAsync(Message message, TimeoutHelper timeoutHelper)

bool success = false;

var cancelTokenTask = _timeoutHelper.GetCancellationTokenAsync();

try
{
_httpResponseMessage = await _httpClient.SendAsync(_httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, _timeoutHelper.CancellationToken);
var timeoutToken = await cancelTokenTask;
timeoutToken.Register(s_cancelCts, _httpSendCts);
_httpResponseMessage = await _httpClient.SendAsync(_httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, _httpSendCts.Token);
// As we have the response message and no exceptions have been thrown, the request message has completed it's job.
// Calling Dispose() on the request message to free up resources in HttpContent, but keeping the object around
// as we can still query properties once dispose'd.
Expand All @@ -689,7 +707,7 @@ public async Task SendRequestAsync(Message message, TimeoutHelper timeoutHelper)
}
catch (OperationCanceledException)
{
if (_timeoutHelper.CancellationToken.IsCancellationRequested)
if (cancelTokenTask.Result.IsCancellationRequested)
{
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException(SR.Format(
SR.HttpRequestTimedOut, _httpRequestMessage.RequestUri, _timeoutHelper.OriginalTimeout)));
Expand Down Expand Up @@ -719,11 +737,12 @@ public async Task SendRequestAsync(Message message, TimeoutHelper timeoutHelper)

private void Cleanup()
{
s_cancelCts(_httpSendCts);

if (_httpRequestMessage != null)
{
var httpRequestMessageSnapshot = _httpRequestMessage;
_httpRequestMessage = null;
_timeoutHelper.CancelCancellationToken(false);
TryCompleteHttpRequest(httpRequestMessageSnapshot);
httpRequestMessageSnapshot.Dispose();
}
Expand Down Expand Up @@ -752,7 +771,8 @@ public async Task<Message> ReceiveReplyAsync(TimeoutHelper timeoutHelper)
}
catch (OperationCanceledException)
{
if (_timeoutHelper.CancellationToken.IsCancellationRequested)
var cancelToken = _timeoutHelper.GetCancellationToken();
if (cancelToken.IsCancellationRequested)
{
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException(SR.Format(
SR.HttpResponseTimedOut, _httpRequestMessage.RequestUri, timeoutHelper.OriginalTimeout)));
Expand Down Expand Up @@ -976,7 +996,8 @@ private async Task SendPreauthenticationHeadRequestIfNeeded()
RequestUri = requestUri
};

await _httpClient.SendAsync(headHttpRequestMessage, _timeoutHelper.CancellationToken);
var cancelToken = await _timeoutHelper.GetCancellationTokenAsync();
await _httpClient.SendAsync(headHttpRequestMessage, cancelToken);
}

private bool AuthenticationSchemeMayRequireResend()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ public override int Read(byte[] buffer, int offset, int count)
return ReadAsyncInternal(buffer, offset, count, CancellationToken.None).WaitForCompletion();
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// Supporting a passed in cancellationToken as well as honoring the timeout token in this class would require
// creating a linked token source on every call which is extra allocation and needs disposal. As this is an
// internal classs, it's okay to add this extra constraint to usage of this method.
Contract.Assert(!cancellationToken.CanBeCanceled, "cancellationToken shouldn't be cancellable");
return base.ReadAsync(buffer, offset, count, _timeoutHelper.CancellationToken);
var cancelToken = await _timeoutHelper.GetCancellationTokenAsync();
return await base.ReadAsync(buffer, offset, count, cancelToken);
}

private async Task<int> ReadAsyncInternal(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
Expand All @@ -53,13 +54,14 @@ public override void Write(byte[] buffer, int offset, int count)
WriteAsyncInternal(buffer, offset, count, CancellationToken.None).WaitForCompletion();
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// Supporting a passed in cancellationToken as well as honoring the timeout token in this class would require
// creating a linked token source on every call which is extra allocation and needs disposal. As this is an
// internal classs, it's okay to add this extra constraint to usage of this method.
Contract.Assert(!cancellationToken.CanBeCanceled, "cancellationToken shouldn't be cancellable");
return base.WriteAsync(buffer, offset, count, _timeoutHelper.CancellationToken);
var cancelToken = await _timeoutHelper.GetCancellationTokenAsync();
await base.WriteAsync(buffer, offset, count, cancelToken);
}

private async Task WriteAsyncInternal(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
Expand All @@ -74,7 +76,6 @@ protected override void Dispose(bool disposing)
{
if (disposing)
{
_timeoutHelper.Dispose();
_timeoutHelper = default(TimeoutHelper);
}

Expand Down
Loading

0 comments on commit 24e6cdd

Please sign in to comment.