Skip to content

Commit

Permalink
Merge pull request #469 from dotnet/revert-427-master
Browse files Browse the repository at this point in the history
Revert "Allow caller to await on TimeoutHelper.CancelationToken"
  • Loading branch information
roncain committed Oct 26, 2015
2 parents 24e6cdd + 57aedb6 commit c78ef64
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,54 +8,69 @@

namespace System.Runtime
{
public struct TimeoutHelper
public struct TimeoutHelper : IDisposable
{
public static readonly TimeSpan MaxWait = TimeSpan.FromMilliseconds(Int32.MaxValue);
private static readonly CancellationToken s_precancelledToken = new CancellationToken(true);

private bool _cancellationTokenInitialized;
private bool _deadlineSet;

private CancellationToken _cancellationToken;
private CancellationTokenSource _cts;
private DateTime _deadline;
private TimeSpan _originalTimeout;
public static readonly TimeSpan MaxWait = TimeSpan.FromMilliseconds(Int32.MaxValue);
private static Action<object> s_cancelOnTimeout = state => ((TimeoutHelper)state)._cts.Cancel();

public TimeoutHelper(TimeSpan timeout)
{
Contract.Assert(timeout >= TimeSpan.Zero, "timeout must be non-negative");

_cancellationTokenInitialized = false;
_cts = null;
_originalTimeout = timeout;
_deadline = DateTime.MaxValue;
_deadlineSet = (timeout == TimeSpan.MaxValue);
}

public CancellationToken GetCancellationToken()
// No locks as we expect this class to be used linearly.
// If another CancellationTokenSource is created, we might have a CancellationToken outstanding
// that isn't cancelled if _cts.Cancel() is called. This happens only on the Abort paths, so it's not an issue.
private void InitializeCancellationToken(TimeSpan timeout)
{
return GetCancellationTokenAsync().Result;
if (timeout == TimeSpan.MaxValue || timeout == Timeout.InfiniteTimeSpan)
{
_cancellationToken = CancellationToken.None;
}
else if (timeout > TimeSpan.Zero)
{
_cts = new CancellationTokenSource();
_cancellationToken = _cts.Token;
TimeoutTokenSource.FromTimeout((int)timeout.TotalMilliseconds).Register(s_cancelOnTimeout, this);
}
else
{
_cancellationToken = new CancellationToken(true);
}
_cancellationTokenInitialized = true;
}

public async Task<CancellationToken> GetCancellationTokenAsync()
public CancellationToken CancellationToken
{
if (!_cancellationTokenInitialized)
get
{
var timeout = RemainingTime();
if (timeout >= MaxWait || timeout == Timeout.InfiniteTimeSpan)
{
_cancellationToken = CancellationToken.None;
}
else if (timeout > TimeSpan.Zero)
if (!_cancellationTokenInitialized)
{
_cancellationToken = await TimeoutTokenSource.FromTimeoutAsync((int)timeout.TotalMilliseconds);
InitializeCancellationToken(this.RemainingTime());
}
else
{
_cancellationToken = s_precancelledToken;
}
_cancellationTokenInitialized = true;
return _cancellationToken;
}
}

return _cancellationToken;
public void CancelCancellationToken(bool throwOnFirstException = false)
{
if (_cts != null)
{
_cts.Cancel(throwOnFirstException);
}
}

public TimeSpan OriginalTimeout
Expand Down Expand Up @@ -179,6 +194,16 @@ private void SetDeadline()
_deadlineSet = true;
}

public void Dispose()
{
if (_cancellationTokenInitialized && _cts !=null)
{
_cts.Dispose();
_cancellationTokenInitialized = false;
_cancellationToken = default(CancellationToken);
}
}

public static void ThrowIfNegativeArgument(TimeSpan timeout)
{
ThrowIfNegativeArgument(timeout, "timeout");
Expand Down Expand Up @@ -235,29 +260,9 @@ internal static TimeoutException CreateEnterTimedOutException(TimeSpan timeout)
/// </summary>
internal static class TimeoutTokenSource
{
/// <summary>
/// These are constants use to calculate timeout coalescing, for more description see method FromTimeoutAsync
/// </summary>
private const int CoalescingFactor = 15;
private const int GranularityFactor = 2000;
private const int SegmentationFactor = CoalescingFactor * GranularityFactor;

private const int COALESCING_SPAN_MS = 15;
private static readonly ConcurrentDictionary<long, Task<CancellationToken>> s_tokenCache =
new ConcurrentDictionary<long, Task<CancellationToken>>();
private static readonly Action<object> s_deregisterToken = (object state) =>
{
var args = (Tuple<long, CancellationTokenSource>)state;
Task<CancellationToken> ignored;
try
{
s_tokenCache.TryRemove(args.Item1, out ignored);
}
finally
{
args.Item2.Dispose();
}
};

public static CancellationToken FromTimeout(int millisecondsTimeout)
{
Expand All @@ -273,25 +278,10 @@ public static Task<CancellationToken> FromTimeoutAsync(int millisecondsTimeout)
throw new ArgumentOutOfRangeException("Invalid millisecondsTimeout value " + millisecondsTimeout);
}


// To prevent s_tokenCache growing too large, we have to adjust the granularity of the our coalesce depending
// on the value of millisecondsTimeout. The coalescing span scales proportionally with millisecondsTimeout which
// would garentee constant s_tokenCache size in the case where similar millisecondsTimeout values are accepted.
// If the method is given a wildly different millisecondsTimeout values all the time, the dictionary would still
// only grow logarithmically with respect to the range of the input values
uint currentTime = (uint)Environment.TickCount;
long targetTime = millisecondsTimeout + currentTime;

// Formula for our coalescing span:
// Divide millisecondsTimeout by SegmentationFactor and take the highest bit and then multiply CoalescingFactor back
var segmentValue = millisecondsTimeout / SegmentationFactor;
var coalescingSpanMs = CoalescingFactor;
while (segmentValue > 0)
{
segmentValue >>= 1;
coalescingSpanMs <<= 1;
}
targetTime = ((targetTime + (coalescingSpanMs - 1)) / coalescingSpanMs) * coalescingSpanMs;
// round the targetTime up to the next closest 15ms
targetTime = ((targetTime + (COALESCING_SPAN_MS - 1)) / COALESCING_SPAN_MS) * COALESCING_SPAN_MS;

Task<CancellationToken> tokenTask;

Expand All @@ -304,11 +294,13 @@ public static Task<CancellationToken> FromTimeoutAsync(int millisecondsTimeout)
{
// Since this thread was successful reserving a spot in the cache, it would be the only thread
// that construct the CancellationTokenSource
var tokenSource = new CancellationTokenSource((int)(targetTime - currentTime));
var token = tokenSource.Token;
var token = new CancellationTokenSource((int)(targetTime - currentTime)).Token;

// Clean up cache when Token is canceled
token.Register(s_deregisterToken, Tuple.Create(targetTime, tokenSource));
token.Register(t => {
Task<CancellationToken> ignored;
s_tokenCache.TryRemove((long)t, out ignored);
}, targetTime);

// set the result so other thread may observe the token, and return
tcs.TrySetResult(token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public override async Task<WebSocket> CreateWebSocketAsync(Uri address, WebHeade
webSocket.Options.SetRequestHeader(header, headers[header]);
}

var cancelToken = await timeoutHelper.GetCancellationTokenAsync();
await webSocket.ConnectAsync(address, cancelToken);
await webSocket.ConnectAsync(address, timeoutHelper.CancellationToken);
return webSocket;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,18 +617,6 @@ internal virtual void OnHttpRequestCompleted(HttpRequestMessage request)

internal class HttpClientChannelAsyncRequest : IAsyncRequest
{
private static readonly Action<object> s_cancelCts = state =>
{
try
{
((CancellationTokenSource)state).Cancel();
}
catch (ObjectDisposedException)
{
// ignore
}
};
private HttpClientRequestChannel _channel;
private HttpChannelFactory<IRequestChannel> _factory;
private EndpointAddress _to;
Expand All @@ -639,7 +627,6 @@ internal class HttpClientChannelAsyncRequest : IAsyncRequest
private TimeoutHelper _timeoutHelper;
private int _httpRequestCompleted;
private HttpClient _httpClient;
private readonly CancellationTokenSource _httpSendCts;

public HttpClientChannelAsyncRequest(HttpClientRequestChannel channel)
{
Expand All @@ -648,7 +635,6 @@ public HttpClientChannelAsyncRequest(HttpClientRequestChannel channel)
_via = channel.Via;
_factory = channel.Factory;
_httpClient = _factory.GetHttpClient();
_httpSendCts = new CancellationTokenSource();
}

public async Task SendRequestAsync(Message message, TimeoutHelper timeoutHelper)
Expand Down Expand Up @@ -687,13 +673,9 @@ public async Task SendRequestAsync(Message message, TimeoutHelper timeoutHelper)

bool success = false;

var cancelTokenTask = _timeoutHelper.GetCancellationTokenAsync();

try
{
var timeoutToken = await cancelTokenTask;
timeoutToken.Register(s_cancelCts, _httpSendCts);
_httpResponseMessage = await _httpClient.SendAsync(_httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, _httpSendCts.Token);
_httpResponseMessage = await _httpClient.SendAsync(_httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, _timeoutHelper.CancellationToken);
// As we have the response message and no exceptions have been thrown, the request message has completed it's job.
// Calling Dispose() on the request message to free up resources in HttpContent, but keeping the object around
// as we can still query properties once dispose'd.
Expand All @@ -707,7 +689,7 @@ public async Task SendRequestAsync(Message message, TimeoutHelper timeoutHelper)
}
catch (OperationCanceledException)
{
if (cancelTokenTask.Result.IsCancellationRequested)
if (_timeoutHelper.CancellationToken.IsCancellationRequested)
{
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException(SR.Format(
SR.HttpRequestTimedOut, _httpRequestMessage.RequestUri, _timeoutHelper.OriginalTimeout)));
Expand Down Expand Up @@ -737,12 +719,11 @@ public async Task SendRequestAsync(Message message, TimeoutHelper timeoutHelper)

private void Cleanup()
{
s_cancelCts(_httpSendCts);

if (_httpRequestMessage != null)
{
var httpRequestMessageSnapshot = _httpRequestMessage;
_httpRequestMessage = null;
_timeoutHelper.CancelCancellationToken(false);
TryCompleteHttpRequest(httpRequestMessageSnapshot);
httpRequestMessageSnapshot.Dispose();
}
Expand Down Expand Up @@ -771,8 +752,7 @@ public async Task<Message> ReceiveReplyAsync(TimeoutHelper timeoutHelper)
}
catch (OperationCanceledException)
{
var cancelToken = _timeoutHelper.GetCancellationToken();
if (cancelToken.IsCancellationRequested)
if (_timeoutHelper.CancellationToken.IsCancellationRequested)
{
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException(SR.Format(
SR.HttpResponseTimedOut, _httpRequestMessage.RequestUri, timeoutHelper.OriginalTimeout)));
Expand Down Expand Up @@ -996,8 +976,7 @@ private async Task SendPreauthenticationHeadRequestIfNeeded()
RequestUri = requestUri
};

var cancelToken = await _timeoutHelper.GetCancellationTokenAsync();
await _httpClient.SendAsync(headHttpRequestMessage, cancelToken);
await _httpClient.SendAsync(headHttpRequestMessage, _timeoutHelper.CancellationToken);
}

private bool AuthenticationSchemeMayRequireResend()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ public override int Read(byte[] buffer, int offset, int count)
return ReadAsyncInternal(buffer, offset, count, CancellationToken.None).WaitForCompletion();
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// Supporting a passed in cancellationToken as well as honoring the timeout token in this class would require
// creating a linked token source on every call which is extra allocation and needs disposal. As this is an
// internal classs, it's okay to add this extra constraint to usage of this method.
Contract.Assert(!cancellationToken.CanBeCanceled, "cancellationToken shouldn't be cancellable");
var cancelToken = await _timeoutHelper.GetCancellationTokenAsync();
return await base.ReadAsync(buffer, offset, count, cancelToken);
return base.ReadAsync(buffer, offset, count, _timeoutHelper.CancellationToken);
}

private async Task<int> ReadAsyncInternal(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
Expand All @@ -54,14 +53,13 @@ public override void Write(byte[] buffer, int offset, int count)
WriteAsyncInternal(buffer, offset, count, CancellationToken.None).WaitForCompletion();
}

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// Supporting a passed in cancellationToken as well as honoring the timeout token in this class would require
// creating a linked token source on every call which is extra allocation and needs disposal. As this is an
// internal classs, it's okay to add this extra constraint to usage of this method.
Contract.Assert(!cancellationToken.CanBeCanceled, "cancellationToken shouldn't be cancellable");
var cancelToken = await _timeoutHelper.GetCancellationTokenAsync();
await base.WriteAsync(buffer, offset, count, cancelToken);
return base.WriteAsync(buffer, offset, count, _timeoutHelper.CancellationToken);
}

private async Task WriteAsyncInternal(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
Expand All @@ -76,6 +74,7 @@ protected override void Dispose(bool disposing)
{
if (disposing)
{
_timeoutHelper.Dispose();
_timeoutHelper = default(TimeoutHelper);
}

Expand Down
Loading

0 comments on commit c78ef64

Please sign in to comment.