Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More optimizations for InFlightRequest #362

Merged
merged 12 commits into from
Jan 30, 2020
56 changes: 56 additions & 0 deletions src/Benchmarks/MicroBenchmarks/InFlightRequestBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Threading;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Jobs;
using NATS.Client.Internals;

namespace MicroBenchmarks
{
[MemoryDiagnoser]
[MarkdownExporterAttribute.GitHub]
[SimpleJob(RuntimeMoniker.Net462)]
[SimpleJob(RuntimeMoniker.NetCoreApp31)]
[GroupBenchmarksBy(BenchmarkLogicalGroupRule.ByCategory)]
[CategoriesColumn]
public class InFlightRequestBenchmark
{
private static void OnCompleted(string _) { }

private static CancellationToken _ct = new CancellationTokenSource().Token;

[Params(0, 999_999)]
public int Timeout { get; set; }

[BenchmarkCategory("DefaultToken")]
[Benchmark]
public string InFlightRequest()
{
var request = new InFlightRequest("a", default, Timeout, OnCompleted);
var id = request.Id;
request.Dispose();
return id;
}
[BenchmarkCategory("ClientToken")]
[Benchmark]
public string InFlightRequestClientToken()
{
var request = new InFlightRequest("a", _ct, Timeout, OnCompleted);
var id = request.Id;
request.Dispose();
return id;
}
}
}
1 change: 1 addition & 0 deletions src/Benchmarks/MicroBenchmarks/MicroBenchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<DebugSymbols>true</DebugSymbols>
<IsPackable>false</IsPackable>
</PropertyGroup>

<PropertyGroup Condition="$(Configuration) == 'Release'">
Expand Down
2 changes: 1 addition & 1 deletion src/Benchmarks/MicroBenchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class Program
{
public static void Main(string[] args)
{
var summary = BenchmarkRunner.Run<NuidBenchmark>();
BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args);
}
}
}
40 changes: 0 additions & 40 deletions src/NATS.Client/Conn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,46 +158,6 @@ public Options Opts
private readonly ConcurrentDictionary<string, InFlightRequest> waitingRequests
= new ConcurrentDictionary<string, InFlightRequest>(StringComparer.OrdinalIgnoreCase);

// Handles in-flight requests when using the new-style request/reply behavior
private sealed class InFlightRequest : IDisposable
{
internal InFlightRequest(string id, CancellationToken token, int timeout, Action<string> onCompleted)
{
this.Id = id;
this.Waiter = new TaskCompletionSource<Msg>();
this.onCompleted = onCompleted;
this.tokenSource = token == CancellationToken.None
? new CancellationTokenSource()
: CancellationTokenSource.CreateLinkedTokenSource(token);

this.tokenRegistration = this.tokenSource.Token.Register(() =>
{
if (timeout > 0)
this.Waiter.TrySetException(new NATSTimeoutException());

this.Waiter.TrySetCanceled();
});

if(timeout > 0)
this.tokenSource.CancelAfter(timeout);
}

public string Id { get; }
public TaskCompletionSource<Msg> Waiter { get; }
public CancellationToken Token => tokenSource.Token;

private readonly Action<string> onCompleted;
private readonly CancellationTokenSource tokenSource;
private CancellationTokenRegistration tokenRegistration;

public void Dispose()
{
this.tokenRegistration.Dispose();
this.tokenSource?.Dispose();
this.onCompleted?.Invoke(this.Id);
}
}

// Prepare protocol messages for efficiency
private byte[] PING_P_BYTES = null;
private int PING_P_BYTES_LEN;
Expand Down
95 changes: 95 additions & 0 deletions src/NATS.Client/Internals/InFlightRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2017-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace NATS.Client.Internals
{
/// <summary>
/// Represents an in-flight request/reply operation.
/// </summary>
/// <remarks>
/// This class is not used when using the legacy request/reply
/// pattern (see <see cref="Options.UseOldRequestStyle"/>).
/// </remarks>
internal sealed class InFlightRequest : IDisposable
{
private readonly Action<string> _onCompleted;
private readonly CancellationTokenSource _tokenSource;
private readonly CancellationTokenRegistration _tokenRegistration;
private readonly CancellationToken _clientProvidedToken;

public readonly string Id;
jasper-d marked this conversation as resolved.
Show resolved Hide resolved
public readonly CancellationToken Token;
public readonly TaskCompletionSource<Msg> Waiter = new TaskCompletionSource<Msg>();

/// <summary>
/// Initializes a new instance of <see cref="InFlightRequest"/> class.
/// </summary>
/// <param name="id">The id associated with the request.</param>
/// <param name="token">The cancellation token used to cancel the request.</param>
/// <param name="timeout">A timeout (ms) after which the request is canceled.</param>
/// <param name="onCompleted">The delegate that will be executed after the request ended.</param>
/// <exception cref="TaskCanceledException">Thrown if the request is cancelled by <paramref name="token"/> before receiving a response.</exception>
/// <exception cref="NATSTimeoutException">Thrown if the request is cancelled because <paramref name="timeout"/> period has elapsed before receiving a response.</exception>
internal InFlightRequest(string id, CancellationToken token, int timeout, Action<string> onCompleted)
{
_onCompleted = onCompleted ?? throw new ArgumentNullException(nameof(onCompleted));
_clientProvidedToken = token;
Id = id;

if (timeout > 0 && token == default)
{
_tokenSource = new CancellationTokenSource();
Token = _tokenSource.Token;
}
else if (timeout > 0)
{
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
Token = _tokenSource.Token;
}
else
{
Token = token;
}

_tokenRegistration = Token.Register(CancellationCallback, this);

if (timeout > 0)
_tokenSource.CancelAfter(timeout);
}

private static void CancellationCallback(object req)
{
var request = req as InFlightRequest;

if (request._clientProvidedToken.IsCancellationRequested)
request.Waiter.TrySetCanceled();

request.Waiter.TrySetException(new NATSTimeoutException());
}

/// <summary>
/// Releases all resources used by the current instance of the <see cref="InFlightRequest"/>
/// class and invokes the <c>onCompleted</c> delegate.
/// </summary>
public void Dispose()
{
_tokenRegistration.Dispose();
_tokenSource?.Dispose();
_onCompleted.Invoke(Id);
}
}
}
1 change: 0 additions & 1 deletion src/NATS.Client/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#if DEBUG
[assembly: InternalsVisibleTo("UnitTests")]
[assembly: InternalsVisibleTo("MicroBenchmarks")]

#else
[assembly: InternalsVisibleTo("UnitTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd")]
[assembly: InternalsVisibleTo("MicroBenchmarks, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd")]
Expand Down
94 changes: 94 additions & 0 deletions src/Tests/UnitTests/Internals/InFlightRequestTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading;
using System.Threading.Tasks;
using NATS.Client;
using NATS.Client.Internals;
using Xunit;

namespace UnitTests.Internals
{
public class InFlightRequestTests
{
[Fact]
public async Task Timeout_ThrowsNatsTimeoutException()
{
// Arrange
var sut = new InFlightRequest("Foo", default, 1, _ => { });

// Assert
await Assert.ThrowsAsync<NATSTimeoutException>(() => sut.Waiter.Task);
}

[Fact]
public async Task TimeoutWithToken_ThrowsTaskCanceledExcpetion()
{
// Arrange
var cts = new CancellationTokenSource();
var sut = new InFlightRequest("Foo", cts.Token, 1, _ => { });

// Assert
await Assert.ThrowsAsync<NATSTimeoutException>(() => sut.Waiter.Task);
}

[Fact]
public async Task Canceled_ThrowsTaskCanceledExcpetion()
{
// Arrange
var cts = new CancellationTokenSource();
var sut = new InFlightRequest("Foo", cts.Token, 0, _ => { });

// Act
cts.Cancel();

// Assert
await Assert.ThrowsAsync<TaskCanceledException>(() => sut.Waiter.Task);
}

[Fact]
public async Task CanceledWithTimeout_ThrowsTaskCanceledException()
{
// Arrange
var cts = new CancellationTokenSource();
var sut = new InFlightRequest("Foo", cts.Token, int.MaxValue, _ => { });

// Act
cts.Cancel();

// Assert
await Assert.ThrowsAsync<TaskCanceledException>(() => sut.Waiter.Task);
}

[Fact]
public void Dispose_InvokesOnCompletedDelegate()
{
// Arrange
var onCompletedArg = "";
var sut = new InFlightRequest("Foo", default, 0, id => { onCompletedArg = id; });

// Act
sut.Dispose();

// Assert
Assert.Equal("Foo", onCompletedArg);
}

[Fact]
public void Ctor_ThrowsForNullArg()
{
Assert.Throws<ArgumentNullException>("onCompleted", () => new InFlightRequest("Foo", default, 0, null));
}
}
}