Skip to content

Commit

Permalink
Perf | SqlClient Managed networking improvements (#173)
Browse files Browse the repository at this point in the history
  • Loading branch information
cheenamalhotra authored Dec 5, 2019
1 parent 2a8f998 commit 521a465
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ internal abstract class SNIHandle
/// </summary>
public abstract Guid ConnectionId { get; }

public virtual int ReserveHeaderSize => 0;
#if DEBUG
/// <summary>
/// Test handle for killing underlying connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public uint SendAsync(SNIPacket packet, SNIAsyncCallback callback)
/// <returns>SNI error code</returns>
public uint ReceiveAsync(ref SNIPacket packet)
{
if (packet != null)
{
packet.Release();
packet = null;
}

lock (this)
{
return _lowerHandle.ReceiveAsync(ref packet);
Expand Down Expand Up @@ -137,7 +143,7 @@ public void HandleReceiveError(SNIPacket packet)
handle.HandleReceiveError(packet);
}
}
packet?.Dispose();
packet?.Release();
}

/// <summary>
Expand Down Expand Up @@ -187,8 +193,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

if (bytesTaken == 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand All @@ -204,7 +208,7 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)
_currentHeader.Read(_headerBytes);

_dataBytesLeft = (int)_currentHeader.length;
_currentPacket = new SNIPacket((int)_currentHeader.length);
_currentPacket = new SNIPacket(headerSize: 0, dataSize: (int)_currentHeader.length);
}

currentHeader = _currentHeader;
Expand All @@ -219,8 +223,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

if (_dataBytesLeft > 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand Down Expand Up @@ -276,8 +278,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)
{
if (packet.DataLeft == 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Microsoft.Data.SqlClient.SNI
/// <summary>
/// MARS handle
/// </summary>
internal class SNIMarsHandle : SNIHandle
internal sealed class SNIMarsHandle : SNIHandle
{
private const uint ACK_THRESHOLD = 2;

Expand All @@ -37,24 +37,14 @@ internal class SNIMarsHandle : SNIHandle
/// <summary>
/// Connection ID
/// </summary>
public override Guid ConnectionId
{
get
{
return _connectionId;
}
}
public override Guid ConnectionId => _connectionId;

/// <summary>
/// Handle status
/// </summary>
public override uint Status
{
get
{
return _status;
}
}
public override uint Status => _status;

public override int ReserveHeaderSize => SNISMUXHeader.HEADER_LENGTH;

/// <summary>
/// Dispose object
Expand Down Expand Up @@ -93,49 +83,45 @@ public SNIMarsHandle(SNIMarsConnection connection, ushort sessionId, object call
/// <param name="flags">SMUX header flags</param>
private void SendControlPacket(SNISMUXFlags flags)
{
Span<byte> headerBytes = stackalloc byte[SNISMUXHeader.HEADER_LENGTH];
SNIPacket packet = new SNIPacket(headerSize: SNISMUXHeader.HEADER_LENGTH, dataSize: 0);

lock (this)
{
GetSMUXHeaderBytes(0, flags, headerBytes);
SetupSMUXHeader(0, flags);
_currentHeader.Write(packet.GetHeaderBuffer(SNISMUXHeader.HEADER_LENGTH));
packet.SetHeaderActive();
}

SNIPacket packet = new SNIPacket(SNISMUXHeader.HEADER_LENGTH);
packet.AppendData(headerBytes);

_connection.Send(packet);
}

private void GetSMUXHeaderBytes(int length, SNISMUXFlags flags, Span<byte> bytes)
private void SetupSMUXHeader(int length, SNISMUXFlags flags)
{
Debug.Assert(Monitor.IsEntered(this), "must take lock on self before updating mux header");

_currentHeader.SMID = 83;
_currentHeader.flags = (byte)flags;
_currentHeader.sessionId = _sessionId;
_currentHeader.length = (uint)SNISMUXHeader.HEADER_LENGTH + (uint)length;
_currentHeader.sequenceNumber = ((flags == SNISMUXFlags.SMUX_FIN) || (flags == SNISMUXFlags.SMUX_ACK)) ? _sequenceNumber - 1 : _sequenceNumber++;
_currentHeader.highwater = _receiveHighwater;
_receiveHighwaterLastAck = _currentHeader.highwater;

_currentHeader.Write(bytes);
}

/// <summary>
/// Generate a packet with SMUX header
/// </summary>
/// <param name="packet">SNI packet</param>
/// <returns>Encapsulated SNI packet</returns>
private SNIPacket GetSMUXEncapsulatedPacket(SNIPacket packet)
/// <returns>The packet with the SMUx header set.</returns>
private SNIPacket SetPacketSMUXHeader(SNIPacket packet)
{
uint xSequenceNumber = _sequenceNumber;
Span<byte> header = stackalloc byte[SNISMUXHeader.HEADER_LENGTH];
GetSMUXHeaderBytes(packet.Length, SNISMUXFlags.SMUX_DATA, header);
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to mux packet without mux reservation");

SNIPacket smuxPacket = new SNIPacket(SNISMUXHeader.HEADER_LENGTH + packet.Length);
smuxPacket.AppendData(header);
smuxPacket.AppendPacket(packet);
packet.Dispose();
SetupSMUXHeader(packet.Length, SNISMUXFlags.SMUX_DATA);
_currentHeader.Write(packet.GetHeaderBuffer(SNISMUXHeader.HEADER_LENGTH));
packet.SetHeaderActive();

return smuxPacket;
return packet;
}

/// <summary>
Expand All @@ -145,6 +131,8 @@ private SNIPacket GetSMUXEncapsulatedPacket(SNIPacket packet)
/// <returns>SNI error code</returns>
public override uint Send(SNIPacket packet)
{
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to send muxed packet without mux reservation in Send");

while (true)
{
lock (this)
Expand All @@ -163,9 +151,12 @@ public override uint Send(SNIPacket packet)
}
}

SNIPacket encapsulatedPacket = GetSMUXEncapsulatedPacket(packet);

return _connection.Send(encapsulatedPacket);
SNIPacket muxedPacket = null;
lock (this)
{
muxedPacket = SetPacketSMUXHeader(packet);
}
return _connection.Send(muxedPacket);
}

/// <summary>
Expand All @@ -176,25 +167,18 @@ public override uint Send(SNIPacket packet)
/// <returns>SNI error code</returns>
private uint InternalSendAsync(SNIPacket packet, SNIAsyncCallback callback)
{
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to send muxed packet without mux reservation in InternalSendAsync");

lock (this)
{
if (_sequenceNumber >= _sendHighwater)
{
return TdsEnums.SNI_QUEUE_FULL;
}

SNIPacket encapsulatedPacket = GetSMUXEncapsulatedPacket(packet);

if (callback != null)
{
encapsulatedPacket.SetCompletionCallback(callback);
}
else
{
encapsulatedPacket.SetCompletionCallback(HandleSendComplete);
}

return _connection.SendAsync(encapsulatedPacket, callback);
SNIPacket muxedPacket = SetPacketSMUXHeader(packet);
muxedPacket.SetCompletionCallback(callback ?? HandleSendComplete);
return _connection.SendAsync(muxedPacket, callback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Microsoft.Data.SqlClient.SNI
/// <summary>
/// Named Pipe connection handle
/// </summary>
internal class SNINpHandle : SNIHandle
internal sealed class SNINpHandle : SNIHandle
{
internal const string DefaultPipePath = @"sql\query"; // e.g. \\HOSTNAME\pipe\sql\query
private const int MAX_PIPE_INSTANCES = 255;
Expand All @@ -26,6 +26,7 @@ internal class SNINpHandle : SNIHandle
private Stream _stream;
private NamedPipeClientStream _pipeStream;
private SslOverTdsStream _sslOverTdsStream;

private SslStream _sslStream;
private SNIAsyncCallback _receiveCallback;
private SNIAsyncCallback _sendCallback;
Expand Down Expand Up @@ -150,7 +151,7 @@ public override uint Receive(out SNIPacket packet, int timeout)
packet = null;
try
{
packet = new SNIPacket(_bufferSize);
packet = new SNIPacket(headerSize: 0, dataSize: _bufferSize);
packet.ReadFromStream(_stream);

if (packet.Length == 0)
Expand All @@ -174,8 +175,8 @@ public override uint Receive(out SNIPacket packet, int timeout)

public override uint ReceiveAsync(ref SNIPacket packet)
{
packet = new SNIPacket(_bufferSize);

packet = new SNIPacket(headerSize: 0, dataSize: _bufferSize);
try
{
packet.ReadFromStreamAsync(_stream, _receiveCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<
bool error = false;
try
{
packet._length = await valueTask.ConfigureAwait(false);
if (packet._length == 0)
packet._dataLength = await valueTask.ConfigureAwait(false);
if (packet._dataLength == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
Expand All @@ -45,13 +45,13 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<
cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, 0, _capacity), CancellationToken.None);
ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, _headerLength, _dataCapacity), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
_length = vt.Result;
_dataLength = vt.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
if (_dataLength > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

Expand All @@ -61,7 +61,7 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<
else
{
// Avoid consuming the same instance twice.
vt = new ValueTask<int>(_length);
vt = new ValueTask<int>(_dataLength);
}
}

Expand Down Expand Up @@ -96,11 +96,11 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfter)
{
packet.Dispose();
packet.Release();
}
}

ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, 0, _length), CancellationToken.None);
ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, _headerLength, _dataLength), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
Expand All @@ -111,7 +111,7 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfterWriteAsync)
{
Dispose();
Release();
}

// Completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int>
bool error = false;
try
{
packet._length = await task.ConfigureAwait(false);
if (packet._length == 0)
packet._dataLength = await task.ConfigureAwait(false);
if (packet._dataLength == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
Expand All @@ -45,13 +45,13 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int>
cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

Task<int> t = stream.ReadAsync(_data, 0, _capacity, CancellationToken.None);
Task<int> t = stream.ReadAsync(_data, _headerLength, _dataCapacity, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
_length = t.Result;
_dataLength = t.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
if (_dataLength > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

Expand Down Expand Up @@ -91,11 +91,11 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfter)
{
packet.Dispose();
packet.Release();
}
}

Task t = stream.WriteAsync(_data, 0, _length, CancellationToken.None);
Task t = stream.WriteAsync(_data, _headerLength, _dataLength, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
Expand All @@ -106,7 +106,7 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfterWriteAsync)
{
Dispose();
Release();
}

// Completed
Expand Down
Loading

0 comments on commit 521a465

Please sign in to comment.