Skip to content

Commit

Permalink
[Release 1.1] Enable context switch by default to fix MARS TDS Header…
Browse files Browse the repository at this point in the history
… errors (#959)

Reverting PR 949 as that change is not to be made in v1.1 right now.
  • Loading branch information
cheenamalhotra authored Mar 4, 2021
1 parent 383c6b7 commit 4bac2bd
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4239,15 +4239,6 @@ internal void ResetSnapshotState()
_stateObj._cleanupMetaData = _snapshotCleanupMetaData;
_stateObj._cleanupAltMetaDataSetArray = _snapshotCleanupAltMetaDataSetArray;

// Make sure to go through the appropriate increment/decrement methods if changing the OpenResult flag
if (!_stateObj.HasOpenResult && _state.HasFlag(SnapshottedStateFlags.OpenResult))
{
_stateObj.IncrementAndObtainOpenResultCount(_stateObj._executedUnderTransaction);
}
else if (_stateObj.HasOpenResult && !_state.HasFlag(SnapshottedStateFlags.OpenResult))
{
_stateObj.DecrementOpenResultCount();
}
_stateObj._snapshottedState = _state;

// Reset partially read state (these only need to be maintained if doing async without snapshot)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ static partial void PopulateDefaultValuesPartial(string platformIdentifier, stri
case ".NETCore":
case ".NETFramework":
{
if (version <= 40702)
{
LocalAppContext.DefineSwitchDefault(LocalAppContextSwitches.MakeReadAsyncBlockingString, true);
}
LocalAppContext.DefineSwitchDefault(LocalAppContextSwitches.MakeReadAsyncBlockingString, true);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4337,18 +4337,7 @@ internal void ResetSnapshotState()
_stateObj._nullBitmapInfo = _snapshotNullBitmapInfo;
_stateObj._cleanupMetaData = _snapshotCleanupMetaData;
_stateObj._cleanupAltMetaDataSetArray = _snapshotCleanupAltMetaDataSetArray;

// Make sure to go through the appropriate increment/decrement methods if changing HasOpenResult
if (!_stateObj._hasOpenResult && _snapshotHasOpenResult)
{
_stateObj.IncrementAndObtainOpenResultCount(_stateObj._executedUnderTransaction);
}
else if (_stateObj._hasOpenResult && !_snapshotHasOpenResult)
{
_stateObj.DecrementOpenResultCount();
}
//else _stateObj._hasOpenResult is already == _snapshotHasOpenResult

_stateObj._hasOpenResult = _snapshotHasOpenResult;
_stateObj._receivedColMetaData = _snapshotReceivedColumnMetadata;
_stateObj._attentionReceived = _snapshotAttentionReceived;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,7 @@ public AsyncCancelledConnectionsTest(ITestOutputHelper output)
[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))]
public void CancelAsyncConnections()
{
SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder(DataTestUtility.TCPConnectionString);
builder.MultipleActiveResultSets = false;
RunCancelAsyncConnections(builder, false);
RunCancelAsyncConnections(builder, true);
builder.MultipleActiveResultSets = true;
RunCancelAsyncConnections(builder, false);
RunCancelAsyncConnections(builder, true);
}

private void RunCancelAsyncConnections(SqlConnectionStringBuilder connectionStringBuilder, bool makeAsyncBlocking)
{
SqlConnection.ClearAllPools();
AppContext.SetSwitch("Switch.Microsoft.Data.SqlClient.MakeReadAsyncBlocking", makeAsyncBlocking);

string connectionString = DataTestUtility.TCPConnectionString;
_watch = Stopwatch.StartNew();
_random = new Random(4); // chosen via fair dice role.
ParallelLoopResult results = new ParallelLoopResult();
Expand All @@ -50,7 +37,7 @@ private void RunCancelAsyncConnections(SqlConnectionStringBuilder connectionStri
results = Parallel.For(
fromInclusive: 0,
toExclusive: NumberOfTasks,
(int i) => DoManyAsync(connectionStringBuilder).GetAwaiter().GetResult());
(int i) => DoManyAsync(connectionString).GetAwaiter().GetResult());
}
}
catch (Exception ex)
Expand Down Expand Up @@ -90,26 +77,18 @@ private void DisplaySummary()
}

// This is the the main body that our Tasks run
private async Task DoManyAsync(SqlConnectionStringBuilder connectionStringBuilder)
private async Task DoManyAsync(string connectionString)
{
Interlocked.Increment(ref _start);
Interlocked.Increment(ref _inFlight);

using (SqlConnection marsConnection = new SqlConnection(connectionStringBuilder.ToString()))
{
if (connectionStringBuilder.MultipleActiveResultSets)
{
await marsConnection.OpenAsync();
}

// First poison
await DoOneAsync(marsConnection, connectionStringBuilder.ToString(), poison: true);
// First poison
await DoOneAsync(connectionString, poison: true);

for (int i = 0; i < NumberOfNonPoisoned && _continue; i++)
{
// now run some without poisoning
await DoOneAsync(marsConnection, connectionStringBuilder.ToString());
}
for (int i = 0; i < NumberOfNonPoisoned && _continue; i++)
{
// now run some without poisoning
await DoOneAsync(connectionString);
}

Interlocked.Decrement(ref _inFlight);
Expand All @@ -120,30 +99,95 @@ private async Task DoManyAsync(SqlConnectionStringBuilder connectionStringBuilde
// if we are poisoning we will
// 1 - Interject some sleeps in the sql statement so that it will run long enough that we can cancel it
// 2 - Setup a time bomb task that will cancel the command a random amount of time later
private async Task DoOneAsync(SqlConnection marsConnection, string connectionString, bool poison = false)
private async Task DoOneAsync(string connectionString, bool poison = false)
{
try
{
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 4; i++)
using (var connection = new SqlConnection(connectionString))
{
builder.AppendLine("SELECT name FROM sys.tables");
if (poison && i < 3)
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 4; i++)
{
builder.AppendLine("WAITFOR DELAY '00:00:01'");
builder.AppendLine("SELECT name FROM sys.tables");
if (poison && i < 3)
{
builder.AppendLine("WAITFOR DELAY '00:00:01'");
}
}
}

using (var connection = new SqlConnection(connectionString))
{
if (marsConnection != null && marsConnection.State == System.Data.ConnectionState.Open)
int rowsRead = 0;
int resultRead = 0;

try
{
await RunCommand(marsConnection, builder.ToString(), poison);
await connection.OpenAsync();
using (var command = connection.CreateCommand())
{
Task timeBombTask = default;
try
{
// Setup our time bomb
if (poison)
{
timeBombTask = TimeBombAsync(command);
}

command.CommandText = builder.ToString();

// Attempt to read all of the data
using (var reader = await command.ExecuteReaderAsync())
{
try
{
do
{
resultRead++;
while (await reader.ReadAsync() && _continue)
{
rowsRead++;
}
}
while (await reader.NextResultAsync() && _continue);
}
catch when (poison)
{
// This looks a little strange, we failed to read above so this should fail too
// But consider the case where this code is elsewhere (in the Dispose method of a class holding this logic)
try
{
while (await reader.NextResultAsync())
{
}
}
catch
{
Interlocked.Increment(ref _poisonCleanUpExceptions);
}

throw;
}
}
}
finally
{
// Make sure to clean up our time bomb
// It is unlikely, but the timebomb may get delayed in the Task Queue
// And we don't want it running after we dispose the command
if (timeBombTask != default)
{
await timeBombTask;
}
}
}
}
else
finally
{
await connection.OpenAsync();
await RunCommand(connection, builder.ToString(), poison);
Interlocked.Add(ref _rowsRead, rowsRead);
Interlocked.Add(ref _resultRead, resultRead);
if (poison)
{
Interlocked.Increment(ref _poisonedEnded);
}
}
}
}
Expand Down Expand Up @@ -179,83 +223,6 @@ private async Task DoOneAsync(SqlConnection marsConnection, string connectionStr
}
}

private async Task RunCommand(SqlConnection connection, string commandText, bool poison)
{
int rowsRead = 0;
int resultRead = 0;

try
{
using (var command = connection.CreateCommand())
{
Task timeBombTask = default;
try
{
// Setup our time bomb
if (poison)
{
timeBombTask = TimeBombAsync(command);
}

command.CommandText = commandText;

// Attempt to read all of the data
using (var reader = await command.ExecuteReaderAsync())
{
try
{
do
{
resultRead++;
while (await reader.ReadAsync() && _continue)
{
rowsRead++;
}
}
while (await reader.NextResultAsync() && _continue);
}
catch when (poison)
{
// This looks a little strange, we failed to read above so this should fail too
// But consider the case where this code is elsewhere (in the Dispose method of a class holding this logic)
try
{
while (await reader.NextResultAsync())
{
}
}
catch
{
Interlocked.Increment(ref _poisonCleanUpExceptions);
}

throw;
}
}
}
finally
{
// Make sure to clean up our time bomb
// It is unlikely, but the timebomb may get delayed in the Task Queue
// And we don't want it running after we dispose the command
if (timeBombTask != default)
{
await timeBombTask;
}
}
}
}
finally
{
Interlocked.Add(ref _rowsRead, rowsRead);
Interlocked.Add(ref _resultRead, resultRead);
if (poison)
{
Interlocked.Increment(ref _poisonedEnded);
}
}
}

private async Task TimeBombAsync(SqlCommand command)
{
await SleepAsync(100, 3000);
Expand Down

0 comments on commit 4bac2bd

Please sign in to comment.