Skip to content

Commit

Permalink
[Release 1.1] Address MARS TDS header contained errors (dotnet#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
cheenamalhotra committed Apr 7, 2020
1 parent ed113ac commit 04270f8
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2855,15 +2855,16 @@ private bool TryProcessDone(SqlCommand cmd, SqlDataReader reader, ref RunBehavio
}
}

// _attentionSent set by 'SendAttention'
// _pendingData set by e.g. 'TdsExecuteSQLBatch'
// _hasOpenResult always set to true by 'WriteMarsHeader'
//
if (!stateObj.HasPendingData && stateObj.HasOpenResult)
if (!stateObj._attentionSent && !stateObj.HasPendingData && stateObj.HasOpenResult)
{
/*
Debug.Assert(!((sqlTransaction != null && _distributedTransaction != null) ||
(_userStartedLocalTransaction != null && _distributedTransaction != null))
, "ProcessDone - have both distributed and local transactions not null!");
(_userStartedLocalTransaction != null && _distributedTransaction != null))
, "ProcessDone - have both distributed and local transactions not null!");
*/
// WebData 112722

Expand Down Expand Up @@ -8562,7 +8563,7 @@ internal Task TdsExecuteSQLBatch(string text, int timeout, SqlNotificationReques
{
Debug.Assert(!sync, "Should not have gotten a Task when writing in sync mode");

// Need to wait for flush - continuation will unlock the connection
// Need to wait for flush - continuation will unlock the connection
bool taskReleaseConnectionLock = releaseConnectionLock;
releaseConnectionLock = false;
return executeTask.ContinueWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3324,10 +3324,11 @@ private bool TryProcessDone(SqlCommand cmd, SqlDataReader reader, ref RunBehavio
}
}

// _attentionSent set by 'SendAttention'
// _pendingData set by e.g. 'TdsExecuteSQLBatch'
// _hasOpenResult always set to true by 'WriteMarsHeader'
//
if (!stateObj._pendingData && stateObj._hasOpenResult)
if (!stateObj._attentionSent && !stateObj._pendingData && stateObj._hasOpenResult)
{
/*
Debug.Assert(!((sqlTransaction != null && _distributedTransaction != null) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
<Compile Include="ProviderAgnostic\MultipleResultsTest\MultipleResultsTest.cs" />
<Compile Include="ProviderAgnostic\ReaderTest\ReaderTest.cs" />
<Compile Include="SQL\AsyncTest\AsyncTest.cs" />
<Compile Include="SQL\AsyncTest\AsyncCancelledConnectionsTest.cs" />
<Compile Include="SQL\SqlCommand\SqlCommandCompletedTest.cs" />
<Compile Include="SQL\SqlCommand\SqlCommandCancelTest.cs" />
<Compile Include="SQL\ConnectionPoolTest\ConnectionPoolTest.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace Microsoft.Data.SqlClient.ManualTesting.Tests
{
public class AsyncCancelledConnectionsTest
{
private readonly ITestOutputHelper _output;
private const int NumberOfTasks = 100; // How many attempts to poison the connection pool we will try
private const int NumberOfNonPoisoned = 10; // Number of normal requests for each attempt

public AsyncCancelledConnectionsTest(ITestOutputHelper output)
{
this._output = output;
}

[ActiveIssue(490)] // This test seems to fail regularly in pipelines due to deadlocks. But it's still useful for local testing.
[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))]
public void CancelAsyncConnections()
{
string connectionString = DataTestUtility.TCPConnectionString;
_watch = Stopwatch.StartNew();
_random = new Random(4); // chosen via fair dice role.
ParallelLoopResult results = new ParallelLoopResult();
try
{
// Setup a timer so that we can see what is going on while our tasks run
using (new Timer(TimerCallback, state: null, dueTime: TimeSpan.FromSeconds(5), period: TimeSpan.FromSeconds(5)))
{
results = Parallel.For(
fromInclusive: 0,
toExclusive: NumberOfTasks,
(int i) => DoManyAsync(connectionString).GetAwaiter().GetResult());
}
}
catch (Exception ex)
{
_output.WriteLine(ex.ToString());
}
while (!results.IsCompleted)
{
Thread.Sleep(50);
}
DisplaySummary();
foreach (var detail in _exceptionDetails)
{
_output.WriteLine(detail);
}
Assert.Empty(_exceptionDetails);
}

// Display one row every 5'ish seconds
private void TimerCallback(object state)
{
lock (_lockObject)
{
DisplaySummary();
}
}

private void DisplaySummary()
{
int count;
lock (_exceptionDetails)
{
count = _exceptionDetails.Count;
}

_output.WriteLine($"{_watch.Elapsed} {_continue} Started:{_start} Done:{_done} InFlight:{_inFlight} RowsRead:{_rowsRead} ResultRead:{_resultRead} PoisonedEnded:{_poisonedEnded} nonPoisonedExceptions:{_nonPoisonedExceptions} PoisonedCleanupExceptions:{_poisonCleanUpExceptions} Count:{count} Found:{_found}");
}

// This is the the main body that our Tasks run
private async Task DoManyAsync(string connectionString)
{
Interlocked.Increment(ref _start);
Interlocked.Increment(ref _inFlight);

// First poison
await DoOneAsync(connectionString, poison: true);

for (int i = 0; i < NumberOfNonPoisoned && _continue; i++)
{
// now run some without poisoning
await DoOneAsync(connectionString);
}

Interlocked.Decrement(ref _inFlight);
Interlocked.Increment(ref _done);
}

// This will do our work, open a connection, and run a query (that returns 4 results sets)
// if we are poisoning we will
// 1 - Interject some sleeps in the sql statement so that it will run long enough that we can cancel it
// 2 - Setup a time bomb task that will cancel the command a random amount of time later
private async Task DoOneAsync(string connectionString, bool poison = false)
{
try
{
using (var connection = new SqlConnection(connectionString))
{
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 4; i++)
{
builder.AppendLine("SELECT name FROM sys.tables");
if (poison && i < 3)
{
builder.AppendLine("WAITFOR DELAY '00:00:01'");
}
}

int rowsRead = 0;
int resultRead = 0;

try
{
await connection.OpenAsync();
using (var command = connection.CreateCommand())
{
Task timeBombTask = default;
try
{
// Setup our time bomb
if (poison)
{
timeBombTask = TimeBombAsync(command);
}

command.CommandText = builder.ToString();

// Attempt to read all of the data
using (var reader = await command.ExecuteReaderAsync())
{
try
{
do
{
resultRead++;
while (await reader.ReadAsync() && _continue)
{
rowsRead++;
}
}
while (await reader.NextResultAsync() && _continue);
}
catch when (poison)
{
// This looks a little strange, we failed to read above so this should fail too
// But consider the case where this code is elsewhere (in the Dispose method of a class holding this logic)
try
{
while (await reader.NextResultAsync())
{
}
}
catch
{
Interlocked.Increment(ref _poisonCleanUpExceptions);
}

throw;
}
}
}
finally
{
// Make sure to clean up our time bomb
// It is unlikely, but the timebomb may get delayed in the Task Queue
// And we don't want it running after we dispose the command
if (timeBombTask != default)
{
await timeBombTask;
}
}
}
}
finally
{
Interlocked.Add(ref _rowsRead, rowsRead);
Interlocked.Add(ref _resultRead, resultRead);
if (poison)
{
Interlocked.Increment(ref _poisonedEnded);
}
}
}
}
catch (Exception ex)
{
if (!poison)
{
Interlocked.Increment(ref _nonPoisonedExceptions);

string details = ex.ToString();
details = details.Substring(0, Math.Min(200, details.Length));
lock (_exceptionDetails)
{
_exceptionDetails.Add(details);
}
}

if (ex.Message.Contains("The MARS TDS header contained errors."))
{
_continue = false;
if (_found == 0) // This check is not really safe we may list more than one.
{
lock (_lockObject)
{
// You will notice that poison will be likely be false here, it is the normal commands that suffer
// Once we have successfully poisoned the connection pool, we may start to see some other request to poison fail just like the normal requests
_output.WriteLine($"{poison} {DateTime.UtcNow.ToString("O")}");
_output.WriteLine(ex.ToString());
}
}
Interlocked.Increment(ref _found);
}
}
}

private async Task TimeBombAsync(SqlCommand command)
{
await SleepAsync(100, 3000);
command.Cancel();
}

private async Task SleepAsync(int minMs, int maxMs)
{
int delayMs;
lock (_random)
{
delayMs = _random.Next(minMs, maxMs);
}
await Task.Delay(delayMs);
}

private Stopwatch _watch;

private int _inFlight;
private int _start;
private int _done;
private int _rowsRead;
private int _resultRead;
private int _nonPoisonedExceptions;
private int _poisonedEnded;
private int _poisonCleanUpExceptions;
private bool _continue = true;
private int _found;
private Random _random;
private object _lockObject = new object();

private HashSet<string> _exceptionDetails = new HashSet<string>();
}
}

0 comments on commit 04270f8

Please sign in to comment.