Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
cheenamalhotra committed Sep 3, 2019
1 parent 9443f0c commit b45159b
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 361 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ internal abstract class SNIHandle
/// </summary>
public abstract Guid ConnectionId { get; }

public virtual int ReserveHeaderSize => 0;
#if DEBUG
/// <summary>
/// Test handle for killing underlying connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public uint SendAsync(SNIPacket packet, SNIAsyncCallback callback)
/// <returns>SNI error code</returns>
public uint ReceiveAsync(ref SNIPacket packet)
{
if (packet != null)
{
packet.Release();
packet = null;
}

lock (this)
{
return _lowerHandle.ReceiveAsync(ref packet);
Expand Down Expand Up @@ -134,7 +140,7 @@ public void HandleReceiveError(SNIPacket packet)
{
handle.HandleReceiveError(packet);
}
packet?.Dispose();
packet?.Release();
}

/// <summary>
Expand Down Expand Up @@ -184,8 +190,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

if (bytesTaken == 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand All @@ -201,7 +205,7 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)
_currentHeader.Read(_headerBytes);

_dataBytesLeft = (int)_currentHeader.length;
_currentPacket = new SNIPacket((int)_currentHeader.length);
_currentPacket = new SNIPacket(headerSize: 0, dataSize: (int)_currentHeader.length);
}

currentHeader = _currentHeader;
Expand All @@ -216,8 +220,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

if (_dataBytesLeft > 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand Down Expand Up @@ -273,8 +275,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)
{
if (packet.DataLeft == 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Microsoft.Data.SqlClient.SNI
/// <summary>
/// MARS handle
/// </summary>
internal class SNIMarsHandle : SNIHandle
internal sealed class SNIMarsHandle : SNIHandle
{
private const uint ACK_THRESHOLD = 2;

Expand All @@ -37,24 +37,14 @@ internal class SNIMarsHandle : SNIHandle
/// <summary>
/// Connection ID
/// </summary>
public override Guid ConnectionId
{
get
{
return _connectionId;
}
}
public override Guid ConnectionId => _connectionId;

/// <summary>
/// Handle status
/// </summary>
public override uint Status
{
get
{
return _status;
}
}
public override uint Status => _status;

public override int ReserveHeaderSize => SNISMUXHeader.HEADER_LENGTH;

/// <summary>
/// Dispose object
Expand Down Expand Up @@ -93,49 +83,45 @@ public SNIMarsHandle(SNIMarsConnection connection, ushort sessionId, object call
/// <param name="flags">SMUX header flags</param>
private void SendControlPacket(SNISMUXFlags flags)
{
Span<byte> headerBytes = stackalloc byte[SNISMUXHeader.HEADER_LENGTH];
SNIPacket packet = new SNIPacket(headerSize: SNISMUXHeader.HEADER_LENGTH, dataSize: 0);

lock (this)
{
GetSMUXHeaderBytes(0, flags, headerBytes);
SetupSMUXHeader(0, flags);
_currentHeader.Write(packet.GetHeaderBuffer(SNISMUXHeader.HEADER_LENGTH));
packet.SetHeaderActive();
}

SNIPacket packet = new SNIPacket(SNISMUXHeader.HEADER_LENGTH);
packet.AppendData(headerBytes);

_connection.Send(packet);
}

private void GetSMUXHeaderBytes(int length, SNISMUXFlags flags, Span<byte> bytes)
private void SetupSMUXHeader(int length, SNISMUXFlags flags)
{
Debug.Assert(Monitor.IsEntered(this), "must take lock on self before updating mux header");

_currentHeader.SMID = 83;
_currentHeader.flags = (byte)flags;
_currentHeader.sessionId = _sessionId;
_currentHeader.length = (uint)SNISMUXHeader.HEADER_LENGTH + (uint)length;
_currentHeader.sequenceNumber = ((flags == SNISMUXFlags.SMUX_FIN) || (flags == SNISMUXFlags.SMUX_ACK)) ? _sequenceNumber - 1 : _sequenceNumber++;
_currentHeader.highwater = _receiveHighwater;
_receiveHighwaterLastAck = _currentHeader.highwater;

_currentHeader.Write(bytes);
}

/// <summary>
/// Generate a packet with SMUX header
/// </summary>
/// <param name="packet">SNI packet</param>
/// <returns>Encapsulated SNI packet</returns>
private SNIPacket GetSMUXEncapsulatedPacket(SNIPacket packet)
/// <returns>The packet with the SMUx header set.</returns>
private SNIPacket SetPacketSMUXHeader(SNIPacket packet)
{
uint xSequenceNumber = _sequenceNumber;
Span<byte> header = stackalloc byte[SNISMUXHeader.HEADER_LENGTH];
GetSMUXHeaderBytes(packet.Length, SNISMUXFlags.SMUX_DATA, header);
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to mux packet without mux reservation");

SNIPacket smuxPacket = new SNIPacket(SNISMUXHeader.HEADER_LENGTH + packet.Length);
smuxPacket.AppendData(header);
smuxPacket.AppendPacket(packet);
packet.Dispose();
SetupSMUXHeader(packet.Length, SNISMUXFlags.SMUX_DATA);
_currentHeader.Write(packet.GetHeaderBuffer(SNISMUXHeader.HEADER_LENGTH));
packet.SetHeaderActive();

return smuxPacket;
return packet;
}

/// <summary>
Expand All @@ -145,6 +131,8 @@ private SNIPacket GetSMUXEncapsulatedPacket(SNIPacket packet)
/// <returns>SNI error code</returns>
public override uint Send(SNIPacket packet)
{
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to send muxed packet without mux reservation in Send");

while (true)
{
lock (this)
Expand All @@ -163,9 +151,12 @@ public override uint Send(SNIPacket packet)
}
}

SNIPacket encapsulatedPacket = GetSMUXEncapsulatedPacket(packet);

return _connection.Send(encapsulatedPacket);
SNIPacket muxedPacket = null;
lock (this)
{
muxedPacket = SetPacketSMUXHeader(packet);
}
return _connection.Send(muxedPacket);
}

/// <summary>
Expand All @@ -176,25 +167,18 @@ public override uint Send(SNIPacket packet)
/// <returns>SNI error code</returns>
private uint InternalSendAsync(SNIPacket packet, SNIAsyncCallback callback)
{
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to send muxed packet without mux reservation in InternalSendAsync");

lock (this)
{
if (_sequenceNumber >= _sendHighwater)
{
return TdsEnums.SNI_QUEUE_FULL;
}

SNIPacket encapsulatedPacket = GetSMUXEncapsulatedPacket(packet);

if (callback != null)
{
encapsulatedPacket.SetCompletionCallback(callback);
}
else
{
encapsulatedPacket.SetCompletionCallback(HandleSendComplete);
}

return _connection.SendAsync(encapsulatedPacket, callback);
SNIPacket muxedPacket = SetPacketSMUXHeader(packet);
muxedPacket.SetCompletionCallback(callback ?? HandleSendComplete);
return _connection.SendAsync(muxedPacket, callback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Microsoft.Data.SqlClient.SNI
/// <summary>
/// Named Pipe connection handle
/// </summary>
internal class SNINpHandle : SNIHandle
internal sealed class SNINpHandle : SNIHandle
{
internal const string DefaultPipePath = @"sql\query"; // e.g. \\HOSTNAME\pipe\sql\query
private const int MAX_PIPE_INSTANCES = 255;
Expand All @@ -26,6 +26,7 @@ internal class SNINpHandle : SNIHandle
private Stream _stream;
private NamedPipeClientStream _pipeStream;
private SslOverTdsStream _sslOverTdsStream;

private SslStream _sslStream;
private SNIAsyncCallback _receiveCallback;
private SNIAsyncCallback _sendCallback;
Expand Down Expand Up @@ -61,13 +62,13 @@ public SNINpHandle(string serverName, string pipeName, long timerExpire, object
_pipeStream.Connect((int)ts.TotalMilliseconds);
}
}
catch(TimeoutException te)
catch (TimeoutException te)
{
SNICommon.ReportSNIError(SNIProviders.NP_PROV, SNICommon.ConnOpenFailedError, te);
_status = TdsEnums.SNI_ERROR;
return;
}
catch(IOException ioe)
catch (IOException ioe)
{
SNICommon.ReportSNIError(SNIProviders.NP_PROV, SNICommon.ConnOpenFailedError, ioe);
_status = TdsEnums.SNI_ERROR;
Expand Down Expand Up @@ -150,7 +151,7 @@ public override uint Receive(out SNIPacket packet, int timeout)
packet = null;
try
{
packet = new SNIPacket(_bufferSize);
packet = new SNIPacket(headerSize: 0, dataSize: _bufferSize);
packet.ReadFromStream(_stream);

if (packet.Length == 0)
Expand All @@ -174,7 +175,7 @@ public override uint Receive(out SNIPacket packet, int timeout)

public override uint ReceiveAsync(ref SNIPacket packet)
{
packet = new SNIPacket(_bufferSize);
packet = new SNIPacket(headerSize: 0, dataSize: _bufferSize);

try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<
bool error = false;
try
{
packet._length = await valueTask.ConfigureAwait(false);
if (packet._length == 0)
packet._dataLength = await valueTask.ConfigureAwait(false);
if (packet._dataLength == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
Expand All @@ -46,13 +46,13 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<
cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, 0, _capacity), CancellationToken.None);
ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, _headerLength, _dataCapacity), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
_length = vt.Result;
_dataLength = vt.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
if (_dataLength > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

Expand Down Expand Up @@ -89,11 +89,11 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfter)
{
packet.Dispose();
packet.Release();
}
}

ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, 0, _length), CancellationToken.None);
ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, _headerLength, _dataLength), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
Expand All @@ -104,7 +104,7 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfterWriteAsync)
{
Dispose();
Release();
}

// Completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int>
bool error = false;
try
{
packet._length = await task.ConfigureAwait(false);
if (packet._length == 0)
packet._dataLength = await task.ConfigureAwait(false);
if (packet._dataLength == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
Expand All @@ -46,13 +46,13 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int>
cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

Task<int> t = stream.ReadAsync(_data, 0, _capacity, CancellationToken.None);
Task<int> t = stream.ReadAsync(_data, _headerLength, _dataCapacity, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
_length = t.Result;
_dataLength = t.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
if (_dataLength > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

Expand Down Expand Up @@ -89,11 +89,11 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfter)
{
packet.Dispose();
packet.Release();
}
}

Task t = stream.WriteAsync(_data, 0, _length, CancellationToken.None);
Task t = stream.WriteAsync(_data, _headerLength, _dataLength, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
Expand All @@ -104,7 +104,7 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfterWriteAsync)
{
Dispose();
Release();
}

// Completed
Expand Down
Loading

0 comments on commit b45159b

Please sign in to comment.