From 04270f8cf8475c2d6a41ca56ec01b4be47a49383 Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Mon, 6 Apr 2020 17:56:54 -0700 Subject: [PATCH] [Release 1.1] Address MARS TDS header contained errors (#490) --- .../src/Microsoft/Data/SqlClient/TdsParser.cs | 9 +- .../src/Microsoft/Data/SqlClient/TdsParser.cs | 3 +- ....Data.SqlClient.ManualTesting.Tests.csproj | 1 + .../AsyncCancelledConnectionsTest.cs | 259 ++++++++++++++++++ 4 files changed, 267 insertions(+), 5 deletions(-) create mode 100644 src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs index e530dacc6a..0931d57557 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs @@ -2855,15 +2855,16 @@ private bool TryProcessDone(SqlCommand cmd, SqlDataReader reader, ref RunBehavio } } + // _attentionSent set by 'SendAttention' // _pendingData set by e.g. 'TdsExecuteSQLBatch' // _hasOpenResult always set to true by 'WriteMarsHeader' // - if (!stateObj.HasPendingData && stateObj.HasOpenResult) + if (!stateObj._attentionSent && !stateObj.HasPendingData && stateObj.HasOpenResult) { /* Debug.Assert(!((sqlTransaction != null && _distributedTransaction != null) || - (_userStartedLocalTransaction != null && _distributedTransaction != null)) - , "ProcessDone - have both distributed and local transactions not null!"); + (_userStartedLocalTransaction != null && _distributedTransaction != null)) + , "ProcessDone - have both distributed and local transactions not null!"); */ // WebData 112722 @@ -8562,7 +8563,7 @@ internal Task TdsExecuteSQLBatch(string text, int timeout, SqlNotificationReques { Debug.Assert(!sync, "Should not have gotten a Task when writing in sync mode"); - // Need to wait for flush - continuation will unlock the connection + // Need to wait for flush - continuation will unlock the connection bool taskReleaseConnectionLock = releaseConnectionLock; releaseConnectionLock = false; return executeTask.ContinueWith( diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs index 1c906042f1..165e3eb4bb 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs @@ -3324,10 +3324,11 @@ private bool TryProcessDone(SqlCommand cmd, SqlDataReader reader, ref RunBehavio } } + // _attentionSent set by 'SendAttention' // _pendingData set by e.g. 'TdsExecuteSQLBatch' // _hasOpenResult always set to true by 'WriteMarsHeader' // - if (!stateObj._pendingData && stateObj._hasOpenResult) + if (!stateObj._attentionSent && !stateObj._pendingData && stateObj._hasOpenResult) { /* Debug.Assert(!((sqlTransaction != null && _distributedTransaction != null) || diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/Microsoft.Data.SqlClient.ManualTesting.Tests.csproj b/src/Microsoft.Data.SqlClient/tests/ManualTests/Microsoft.Data.SqlClient.ManualTesting.Tests.csproj index 7a82bfbf08..48f4fd0498 100644 --- a/src/Microsoft.Data.SqlClient/tests/ManualTests/Microsoft.Data.SqlClient.ManualTesting.Tests.csproj +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/Microsoft.Data.SqlClient.ManualTesting.Tests.csproj @@ -93,6 +93,7 @@ + diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs new file mode 100644 index 0000000000..5578e968ce --- /dev/null +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs @@ -0,0 +1,259 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.Data.SqlClient.ManualTesting.Tests +{ + public class AsyncCancelledConnectionsTest + { + private readonly ITestOutputHelper _output; + private const int NumberOfTasks = 100; // How many attempts to poison the connection pool we will try + private const int NumberOfNonPoisoned = 10; // Number of normal requests for each attempt + + public AsyncCancelledConnectionsTest(ITestOutputHelper output) + { + this._output = output; + } + + [ActiveIssue(490)] // This test seems to fail regularly in pipelines due to deadlocks. But it's still useful for local testing. + [ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))] + public void CancelAsyncConnections() + { + string connectionString = DataTestUtility.TCPConnectionString; + _watch = Stopwatch.StartNew(); + _random = new Random(4); // chosen via fair dice role. + ParallelLoopResult results = new ParallelLoopResult(); + try + { + // Setup a timer so that we can see what is going on while our tasks run + using (new Timer(TimerCallback, state: null, dueTime: TimeSpan.FromSeconds(5), period: TimeSpan.FromSeconds(5))) + { + results = Parallel.For( + fromInclusive: 0, + toExclusive: NumberOfTasks, + (int i) => DoManyAsync(connectionString).GetAwaiter().GetResult()); + } + } + catch (Exception ex) + { + _output.WriteLine(ex.ToString()); + } + while (!results.IsCompleted) + { + Thread.Sleep(50); + } + DisplaySummary(); + foreach (var detail in _exceptionDetails) + { + _output.WriteLine(detail); + } + Assert.Empty(_exceptionDetails); + } + + // Display one row every 5'ish seconds + private void TimerCallback(object state) + { + lock (_lockObject) + { + DisplaySummary(); + } + } + + private void DisplaySummary() + { + int count; + lock (_exceptionDetails) + { + count = _exceptionDetails.Count; + } + + _output.WriteLine($"{_watch.Elapsed} {_continue} Started:{_start} Done:{_done} InFlight:{_inFlight} RowsRead:{_rowsRead} ResultRead:{_resultRead} PoisonedEnded:{_poisonedEnded} nonPoisonedExceptions:{_nonPoisonedExceptions} PoisonedCleanupExceptions:{_poisonCleanUpExceptions} Count:{count} Found:{_found}"); + } + + // This is the the main body that our Tasks run + private async Task DoManyAsync(string connectionString) + { + Interlocked.Increment(ref _start); + Interlocked.Increment(ref _inFlight); + + // First poison + await DoOneAsync(connectionString, poison: true); + + for (int i = 0; i < NumberOfNonPoisoned && _continue; i++) + { + // now run some without poisoning + await DoOneAsync(connectionString); + } + + Interlocked.Decrement(ref _inFlight); + Interlocked.Increment(ref _done); + } + + // This will do our work, open a connection, and run a query (that returns 4 results sets) + // if we are poisoning we will + // 1 - Interject some sleeps in the sql statement so that it will run long enough that we can cancel it + // 2 - Setup a time bomb task that will cancel the command a random amount of time later + private async Task DoOneAsync(string connectionString, bool poison = false) + { + try + { + using (var connection = new SqlConnection(connectionString)) + { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < 4; i++) + { + builder.AppendLine("SELECT name FROM sys.tables"); + if (poison && i < 3) + { + builder.AppendLine("WAITFOR DELAY '00:00:01'"); + } + } + + int rowsRead = 0; + int resultRead = 0; + + try + { + await connection.OpenAsync(); + using (var command = connection.CreateCommand()) + { + Task timeBombTask = default; + try + { + // Setup our time bomb + if (poison) + { + timeBombTask = TimeBombAsync(command); + } + + command.CommandText = builder.ToString(); + + // Attempt to read all of the data + using (var reader = await command.ExecuteReaderAsync()) + { + try + { + do + { + resultRead++; + while (await reader.ReadAsync() && _continue) + { + rowsRead++; + } + } + while (await reader.NextResultAsync() && _continue); + } + catch when (poison) + { + // This looks a little strange, we failed to read above so this should fail too + // But consider the case where this code is elsewhere (in the Dispose method of a class holding this logic) + try + { + while (await reader.NextResultAsync()) + { + } + } + catch + { + Interlocked.Increment(ref _poisonCleanUpExceptions); + } + + throw; + } + } + } + finally + { + // Make sure to clean up our time bomb + // It is unlikely, but the timebomb may get delayed in the Task Queue + // And we don't want it running after we dispose the command + if (timeBombTask != default) + { + await timeBombTask; + } + } + } + } + finally + { + Interlocked.Add(ref _rowsRead, rowsRead); + Interlocked.Add(ref _resultRead, resultRead); + if (poison) + { + Interlocked.Increment(ref _poisonedEnded); + } + } + } + } + catch (Exception ex) + { + if (!poison) + { + Interlocked.Increment(ref _nonPoisonedExceptions); + + string details = ex.ToString(); + details = details.Substring(0, Math.Min(200, details.Length)); + lock (_exceptionDetails) + { + _exceptionDetails.Add(details); + } + } + + if (ex.Message.Contains("The MARS TDS header contained errors.")) + { + _continue = false; + if (_found == 0) // This check is not really safe we may list more than one. + { + lock (_lockObject) + { + // You will notice that poison will be likely be false here, it is the normal commands that suffer + // Once we have successfully poisoned the connection pool, we may start to see some other request to poison fail just like the normal requests + _output.WriteLine($"{poison} {DateTime.UtcNow.ToString("O")}"); + _output.WriteLine(ex.ToString()); + } + } + Interlocked.Increment(ref _found); + } + } + } + + private async Task TimeBombAsync(SqlCommand command) + { + await SleepAsync(100, 3000); + command.Cancel(); + } + + private async Task SleepAsync(int minMs, int maxMs) + { + int delayMs; + lock (_random) + { + delayMs = _random.Next(minMs, maxMs); + } + await Task.Delay(delayMs); + } + + private Stopwatch _watch; + + private int _inFlight; + private int _start; + private int _done; + private int _rowsRead; + private int _resultRead; + private int _nonPoisonedExceptions; + private int _poisonedEnded; + private int _poisonCleanUpExceptions; + private bool _continue = true; + private int _found; + private Random _random; + private object _lockObject = new object(); + + private HashSet _exceptionDetails = new HashSet(); + } +}