Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

SqlClient managed networking improvements #35363

Merged
merged 10 commits into from
Jun 4, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ internal abstract class SNIHandle
/// </summary>
public abstract Guid ConnectionId { get; }

public virtual bool SMUXEnabled => false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to make it public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't publc, it's a public method on an internal class so it's effectively private to external view. it is however part of the public surface inside the assembly.


#if DEBUG
/// <summary>
/// Test handle for killing underlying connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public uint SendAsync(SNIPacket packet, SNIAsyncCallback callback)
/// <returns>SNI error code</returns>
public uint ReceiveAsync(ref SNIPacket packet)
{
if (packet != null)
{
packet.Release();
packet = null;
}
lock (this)
{
return _lowerHandle.ReceiveAsync(ref packet);
Expand Down Expand Up @@ -133,7 +138,7 @@ public void HandleReceiveError(SNIPacket packet)
{
handle.HandleReceiveError(packet);
}
packet?.Dispose();
packet?.Release();
}

/// <summary>
Expand Down Expand Up @@ -183,8 +188,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

if (bytesTaken == 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand Down Expand Up @@ -214,8 +217,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

if (_dataBytesLeft > 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand Down Expand Up @@ -271,8 +272,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)
{
if (packet.DataLeft == 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace System.Data.SqlClient.SNI
/// <summary>
/// MARS handle
/// </summary>
internal class SNIMarsHandle : SNIHandle
internal sealed class SNIMarsHandle : SNIHandle
{
private const uint ACK_THRESHOLD = 2;

Expand All @@ -33,27 +33,11 @@ internal class SNIMarsHandle : SNIHandle
private uint _sequenceNumber;
private SNIError _connectionError;

/// <summary>
/// Connection ID
/// </summary>
public override Guid ConnectionId
{
get
{
return _connectionId;
}
}
public override Guid ConnectionId => _connectionId;

/// <summary>
/// Handle status
/// </summary>
public override uint Status
{
get
{
return _status;
}
}
public override uint Status => _status;

public override bool SMUXEnabled => true;

/// <summary>
/// Dispose object
Expand Down Expand Up @@ -93,48 +77,39 @@ public SNIMarsHandle(SNIMarsConnection connection, ushort sessionId, object call
/// <param name="flags">SMUX header flags</param>
private void SendControlPacket(SNISMUXFlags flags)
{
Span<byte> headerBytes = stackalloc byte[SNISMUXHeader.HEADER_LENGTH];
SNIPacket packet = new SNIPacket(0,reserveMuxHeader:true);
lock (this)
{
GetSMUXHeaderBytes(0, flags, headerBytes);
SetupSMUXHeader(0, flags);
packet.SetHeader(_currentHeader);
}

SNIPacket packet = new SNIPacket(SNISMUXHeader.HEADER_LENGTH);
packet.AppendData(headerBytes);

_connection.Send(packet);
}

private void GetSMUXHeaderBytes(int length, SNISMUXFlags flags, Span<byte> bytes)
private void SetupSMUXHeader(int length, SNISMUXFlags flags)
{
Debug.Assert(Monitor.IsEntered(this), "must take lock on self before updating mux header");
_currentHeader.SMID = 83;
_currentHeader.flags = (byte)flags;
_currentHeader.sessionId = _sessionId;
_currentHeader.length = (uint)SNISMUXHeader.HEADER_LENGTH + (uint)length;
_currentHeader.sequenceNumber = ((flags == SNISMUXFlags.SMUX_FIN) || (flags == SNISMUXFlags.SMUX_ACK)) ? _sequenceNumber - 1 : _sequenceNumber++;
_currentHeader.highwater = _receiveHighwater;
_receiveHighwaterLastAck = _currentHeader.highwater;

_currentHeader.Write(bytes);
}

/// <summary>
/// Generate a packet with SMUX header
/// </summary>
/// <param name="packet">SNI packet</param>
/// <returns>Encapsulated SNI packet</returns>
private SNIPacket GetSMUXEncapsulatedPacket(SNIPacket packet)
/// <returns>The packet with the SMUx header set.</returns>
private SNIPacket SetPacketSMUXHeader(SNIPacket packet)
{
uint xSequenceNumber = _sequenceNumber;
Span<byte> header = stackalloc byte[SNISMUXHeader.HEADER_LENGTH];
GetSMUXHeaderBytes(packet.Length, SNISMUXFlags.SMUX_DATA, header);
Debug.Assert(packet.MuxHeaderReserved, "attempting to mux packet without mux reservation");


SNIPacket smuxPacket = new SNIPacket(SNISMUXHeader.HEADER_LENGTH + packet.Length);
smuxPacket.AppendData(header);
smuxPacket.AppendPacket(packet);
packet.Dispose();
return smuxPacket;
SetupSMUXHeader(packet.Length, SNISMUXFlags.SMUX_DATA);
packet.SetHeader(_currentHeader);
return packet;
}

/// <summary>
Expand All @@ -144,6 +119,8 @@ private SNIPacket GetSMUXEncapsulatedPacket(SNIPacket packet)
/// <returns>SNI error code</returns>
public override uint Send(SNIPacket packet)
{
Debug.Assert(packet.MuxHeaderReserved, "attempting to send muxed packet without mux reservation in Send");

while (true)
{
lock (this)
Expand All @@ -161,9 +138,13 @@ public override uint Send(SNIPacket packet)
_ackEvent.Reset();
}
}
SNIPacket encapsulatedPacket = GetSMUXEncapsulatedPacket(packet);

return _connection.Send(encapsulatedPacket);
SNIPacket muxedPacket = null;
lock (this)
{
muxedPacket = SetPacketSMUXHeader(packet);
}
return _connection.Send(muxedPacket);
}

/// <summary>
Expand All @@ -174,25 +155,17 @@ public override uint Send(SNIPacket packet)
/// <returns>SNI error code</returns>
private uint InternalSendAsync(SNIPacket packet, SNIAsyncCallback callback)
{
Debug.Assert(packet.MuxHeaderReserved, "attempting to send muxed packet without mux reservation in InternalSendAsync");
lock (this)
{
if (_sequenceNumber >= _sendHighwater)
{
return TdsEnums.SNI_QUEUE_FULL;
}

SNIPacket encapsulatedPacket = GetSMUXEncapsulatedPacket(packet);

if (callback != null)
{
encapsulatedPacket.SetCompletionCallback(callback);
}
else
{
encapsulatedPacket.SetCompletionCallback(HandleSendComplete);
}

return _connection.SendAsync(encapsulatedPacket, callback);
SNIPacket muxedPacket = SetPacketSMUXHeader(packet);
muxedPacket.SetCompletionCallback(callback??HandleSendComplete);
return _connection.SendAsync(muxedPacket, callback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace System.Data.SqlClient.SNI
/// <summary>
/// Named Pipe connection handle
/// </summary>
internal class SNINpHandle : SNIHandle
internal sealed class SNINpHandle : SNIHandle
{
internal const string DefaultPipePath = @"sql\query"; // e.g. \\HOSTNAME\pipe\sql\query
private const int MAX_PIPE_INSTANCES = 255;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<
cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, 0, _capacity), CancellationToken.None);
ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, _header, _capacity), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
Expand Down Expand Up @@ -88,11 +88,11 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfter)
{
packet.Dispose();
packet.Release();
}
}

ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, 0, _length), CancellationToken.None);
ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, _header, _length), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
Expand All @@ -103,7 +103,7 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfterWriteAsync)
{
Dispose();
Release();
}

// Completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int>
cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

Task<int> t = stream.ReadAsync(_data, 0, _capacity, CancellationToken.None);
Task<int> t = stream.ReadAsync(_data, _header, _capacity, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
Expand Down Expand Up @@ -88,11 +88,11 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfter)
{
packet.Dispose();
packet.Release();
}
}

Task t = stream.WriteAsync(_data, 0, _length, CancellationToken.None);
Task t = stream.WriteAsync(_data, _header, _length, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
Expand All @@ -103,7 +103,7 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfterWriteAsync)
{
Dispose();
Release();
}

// Completed
Expand Down
Loading