Skip to content

Commit

Permalink
Fixes for #8026 and possibly flaky Test: AsyncQueryTestBase.Mixed_syn…
Browse files Browse the repository at this point in the history
…c_async_query (part of #7160)

- Moved query async Semaphore to RelationalConnection so multiple top-level queries can be serialized.
- Moved active query buffer management to RelationalConnection so buffering works across multiple top-level queries.
- Renamed IValueBufferCursor to IBufferable
  • Loading branch information
anpete committed May 16, 2017
1 parent 99d6df6 commit b71afe6
Show file tree
Hide file tree
Showing 15 changed files with 279 additions and 148 deletions.
56 changes: 41 additions & 15 deletions src/EFCore.Relational/Query/Internal/AsyncQueryingEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
Expand Down Expand Up @@ -42,7 +43,7 @@ public AsyncQueryingEnumerable(
/// </summary>
public virtual IAsyncEnumerator<T> GetEnumerator() => new AsyncEnumerator(this);

private sealed class AsyncEnumerator : IAsyncEnumerator<T>, IValueBufferCursor
private sealed class AsyncEnumerator : IAsyncEnumerator<T>, IBufferable
{
private readonly RelationalQueryContext _relationalQueryContext;
private readonly ShaperCommandContext _shaperCommandContext;
Expand All @@ -69,13 +70,14 @@ public async Task<bool> MoveNext(CancellationToken cancellationToken)

try
{
await _relationalQueryContext.Semaphore.WaitAsync(cancellationToken);
await _relationalQueryContext.Connection.Semaphore.WaitAsync(cancellationToken);

if (_buffer == null)
{
var executionStrategy = _relationalQueryContext.ExecutionStrategyFactory.Create();

return await executionStrategy.ExecuteAsync(BufferlessMoveNext, executionStrategy.RetriesOnFailure, cancellationToken);
return await executionStrategy
.ExecuteAsync(BufferlessMoveNext, executionStrategy.RetriesOnFailure, cancellationToken);
}

if (_buffer.Count > 0)
Expand All @@ -89,7 +91,7 @@ public async Task<bool> MoveNext(CancellationToken cancellationToken)
}
finally
{
_relationalQueryContext.Semaphore.Release();
_relationalQueryContext.Connection.Semaphore.Release();
}
}

Expand All @@ -99,15 +101,14 @@ private async Task<bool> BufferlessMoveNext(bool buffer, CancellationToken cance
{
if (_dataReader == null)
{
await _relationalQueryContext.Connection
.OpenAsync(cancellationToken);
await _relationalQueryContext.Connection.OpenAsync(cancellationToken);

var relationalCommand
= _shaperCommandContext
.GetRelationalCommand(_relationalQueryContext.ParameterValues);

await _relationalQueryContext
.RegisterValueBufferCursorAsync(this, cancellationToken);
await _relationalQueryContext.Connection
.RegisterBufferableAsync(this, cancellationToken);

_dataReader
= await relationalCommand.ExecuteReaderAsync(
Expand Down Expand Up @@ -136,7 +137,6 @@ await _relationalQueryContext
}
catch (Exception)
{
_relationalQueryContext.DeregisterValueBufferCursor(this);
_dataReader = null;
_dbDataReader = null;

Expand All @@ -150,7 +150,8 @@ public async Task BufferAllAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (_buffer == null)
if (_buffer == null
&& _dataReader != null)
{
_buffer = new Queue<ValueBuffer>();

Expand All @@ -170,25 +171,50 @@ public async Task BufferAllAsync(CancellationToken cancellationToken)

public void BufferAll()
{
throw new NotImplementedException();
if (_buffer == null
&& _dataReader != null)
{
_buffer = new Queue<ValueBuffer>();

using (_dataReader)
{
while (_dbDataReader.Read())
{
_buffer.Enqueue(_valueBufferFactory.Create(_dbDataReader));
}
}

_relationalQueryContext.Connection?.Close();

_dataReader = null;
_dbDataReader = null;
}
}

public void Dispose()
{
if (!_disposed)
{
lock (_relationalQueryContext)
try
{
_relationalQueryContext.DeregisterValueBufferCursor(this);
_relationalQueryContext.Connection.Semaphore.Wait();

if (_dataReader != null)
{
_dataReader.Dispose();
_dataReader = null;
_dbDataReader = null;
_buffer = null;

_relationalQueryContext.Connection?.Close();
}
}

_disposed = true;
_disposed = true;
}
finally
{
_relationalQueryContext.Connection.Semaphore.Release();
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/EFCore.Relational/Query/Internal/IValueBufferCursor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Microsoft.EntityFrameworkCore.Query.Internal
/// This API supports the Entity Framework Core infrastructure and is not intended to be used
/// directly from your code. This API may change or be removed in future releases.
/// </summary>
public interface IValueBufferCursor
public interface IBufferable
{
/// <summary>
/// This API supports the Entity Framework Core infrastructure and is not intended to be used
Expand Down
49 changes: 35 additions & 14 deletions src/EFCore.Relational/Query/Internal/QueryingEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections;
using System.Collections.Generic;
using System.Data.Common;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
Expand Down Expand Up @@ -45,7 +46,7 @@ public QueryingEnumerable(

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

private sealed class Enumerator : IEnumerator<T>, IValueBufferCursor
private sealed class Enumerator : IEnumerator<T>, IBufferable
{
private readonly RelationalQueryContext _relationalQueryContext;
private readonly ShaperCommandContext _shaperCommandContext;
Expand All @@ -56,8 +57,6 @@ private sealed class Enumerator : IEnumerator<T>, IValueBufferCursor
private DbDataReader _dbDataReader;
private IRelationalValueBufferFactory _valueBufferFactory;

private T _current;

private bool _disposed;

public Enumerator(QueryingEnumerable<T> queryingEnumerable)
Expand All @@ -79,7 +78,7 @@ public bool MoveNext()

if (_buffer.Count > 0)
{
_current = _shaper.Shape(_relationalQueryContext, _buffer.Dequeue());
Current = _shaper.Shape(_relationalQueryContext, _buffer.Dequeue());

return true;
}
Expand All @@ -99,7 +98,7 @@ var relationalCommand
= _shaperCommandContext
.GetRelationalCommand(_relationalQueryContext.ParameterValues);

_relationalQueryContext.RegisterValueBufferCursor(this);
_relationalQueryContext.Connection.RegisterBufferable(this);

_dataReader
= relationalCommand.ExecuteReader(
Expand All @@ -113,7 +112,7 @@ var relationalCommand

var hasNext = _dbDataReader.Read();

_current
Current
= hasNext
? _shaper.Shape(_relationalQueryContext, _valueBufferFactory.Create(_dbDataReader))
: default(T);
Expand All @@ -127,20 +126,19 @@ var relationalCommand
}
catch (Exception)
{
_relationalQueryContext.DeregisterValueBufferCursor(this);
_dataReader = null;
_dbDataReader = null;

throw;
}
}

// ReSharper disable once ConvertToAutoPropertyWithPrivateSetter
public T Current => _current;
public T Current { get; private set; }

public void BufferAll()
{
if (_buffer == null)
if (_buffer == null
&& _dataReader != null)
{
_buffer = new Queue<ValueBuffer>();

Expand All @@ -153,25 +151,48 @@ public void BufferAll()
}

_relationalQueryContext.Connection?.Close();

_dataReader = null;
_dbDataReader = null;
}
}

public Task BufferAllAsync(CancellationToken cancellationToken)
=> throw new NotImplementedException();
public async Task BufferAllAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (_buffer == null
&& _dataReader != null)
{
_buffer = new Queue<ValueBuffer>();

using (_dataReader)
{
while (await _dbDataReader.ReadAsync(cancellationToken))
{
_buffer.Enqueue(_valueBufferFactory.Create(_dbDataReader));
}
}

_relationalQueryContext.Connection?.Close();
_dataReader = null;
_dbDataReader = null;
}
}

object IEnumerator.Current => Current;

public void Dispose()
{
if (!_disposed)
{
_relationalQueryContext.DeregisterValueBufferCursor(this);

if (_dataReader != null)
{
_dataReader.Dispose();
_dataReader = null;
_dbDataReader = null;
_buffer = null;

_relationalQueryContext.Connection?.Close();
}

Expand Down
69 changes: 0 additions & 69 deletions src/EFCore.Relational/Query/RelationalQueryContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore.Query.Internal;
using Microsoft.EntityFrameworkCore.Storage;
Expand All @@ -18,8 +14,6 @@ namespace Microsoft.EntityFrameworkCore.Query
/// </summary>
public class RelationalQueryContext : QueryContext
{
private readonly List<IValueBufferCursor> _activeQueries = new List<IValueBufferCursor>();

/// <summary>
/// This API supports the Entity Framework Core infrastructure and is not intended to be used
/// directly from your code. This API may change or be removed in future releases.
Expand All @@ -46,75 +40,12 @@ public RelationalQueryContext(
/// </value>
public virtual IRelationalConnection Connection { get; }

/// <summary>
/// Gets a semaphore used to serialize async queries.
/// </summary>
/// <value>
/// The semaphore.
/// </value>
public virtual SemaphoreSlim Semaphore { get; } = new SemaphoreSlim(1);

/// <summary>
/// The execution strategy factory.
/// </summary>
/// <value>
/// The execution strategy factory.
/// </value>
public virtual IExecutionStrategyFactory ExecutionStrategyFactory { get; }

/// <summary>
/// Registers a value buffer cursor.
/// </summary>
/// <param name="valueBufferCursor"> The value buffer cursor. </param>
public virtual void RegisterValueBufferCursor([NotNull] IValueBufferCursor valueBufferCursor)
{
Check.NotNull(valueBufferCursor, nameof(valueBufferCursor));

if (_activeQueries.Count > 0
&& !Connection.IsMultipleActiveResultSetsEnabled)
{
_activeQueries.Last().BufferAll();
}

_activeQueries.Add(valueBufferCursor);
}

/// <summary>
/// Asynchronously registers a value buffer cursor.
/// </summary>
/// <param name="valueBufferCursor"> The value buffer cursor. </param>
/// <param name="cancellationToken"> The cancellation token. </param>
/// <returns>
/// A Task.
/// </returns>
public virtual async Task RegisterValueBufferCursorAsync(
[NotNull] IValueBufferCursor valueBufferCursor,
CancellationToken cancellationToken)
{
Check.NotNull(valueBufferCursor, nameof(valueBufferCursor));

if (Connection.ActiveCursor != null
&& !Connection.IsMultipleActiveResultSetsEnabled)
{
await Connection.ActiveCursor.BufferAllAsync(cancellationToken);
}

Connection.ActiveCursor = valueBufferCursor;

_activeQueries.Add(valueBufferCursor);
}

/// <summary>
/// Deregisters the value buffer cursor described by valueBufferCursor.
/// </summary>
/// <param name="valueBufferCursor"> The value buffer cursor. </param>
public virtual void DeregisterValueBufferCursor([NotNull] IValueBufferCursor valueBufferCursor)
{
Check.NotNull(valueBufferCursor, nameof(valueBufferCursor));

Connection.ActiveCursor = null;

_activeQueries.Remove(valueBufferCursor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public static async Task<int> ExecuteSqlCommandAsync(

var concurrencyDetector = databaseFacade.GetService<IConcurrencyDetector>();

using (concurrencyDetector.EnterCriticalSection())
using (await concurrencyDetector.EnterCriticalSectionAsync(cancellationToken))
{
var rawSqlCommand = databaseFacade
.GetRelationalService<IRawSqlCommandBuilder>()
Expand Down
Loading

0 comments on commit b71afe6

Please sign in to comment.