Skip to content

Commit

Permalink
Implement rows affected
Browse files Browse the repository at this point in the history
  • Loading branch information
roji committed Aug 3, 2022
1 parent a2eb561 commit 8c6658c
Show file tree
Hide file tree
Showing 17 changed files with 530 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface IStoreStoredProcedureParameter : IColumnBase
/// Gets the property mappings.
/// </summary>
new IReadOnlyList<IStoredProcedureParameterMapping> PropertyMappings { get; }

/// <summary>
/// Gets the direction of the parameter.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,7 @@ public virtual bool AddParameterMapping(IStoredProcedureParameterMapping paramet
}

ParameterMappings.Add(parameterMapping);

// TODO: I'm removing this to preserve the user-provided parameter ordering; this is important for correct positional parameter
// handling. Should confirm that it's OK to rely in UpdateSqlGenerator on the order of ColumnModification corresponding to the
// order of parameters are specified by the user in the model.
// ParameterMappings.Sort(ColumnMappingBaseComparer.Instance);
ParameterMappings.Sort(ColumnMappingBaseComparer.Instance);

return true;
}
Expand Down
153 changes: 95 additions & 58 deletions src/EFCore.Relational/Update/AffectedCountModificationCommandBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ protected override void Consume(RelationalDataReader reader)
try
{
bool? onResultSet = null;
var hasOutputParameters = false;

for (; commandIndex < CommandResultSet.Count; commandIndex++)
{
var resultSetMapping = CommandResultSet[commandIndex];
var command = ModificationCommands[commandIndex];
var requiresResultPropagation = command.RequiresResultPropagation;

if (resultSetMapping.HasFlag(ResultSetMapping.HasResultRow))
{
Expand All @@ -57,54 +57,72 @@ protected override void Consume(RelationalDataReader reader)
Check.DebugFail("Missing a result set");
}

var lastHandledCommandIndex = requiresResultPropagation
var lastHandledCommandIndex = command.RequiresResultPropagation
? ConsumeResultSetWithPropagation(commandIndex, reader)
: ConsumeResultSetWithoutPropagation(commandIndex, reader);

Check.DebugAssert(resultSetMapping.HasFlag(ResultSetMapping.LastInResultSet)
? lastHandledCommandIndex == commandIndex
: lastHandledCommandIndex > commandIndex, "Bad handling of ResultSetMapping and command indexing");

commandIndex = lastHandledCommandIndex;

onResultSet = reader.DbDataReader.NextResult();
}

if (resultSetMapping.HasFlag(ResultSetMapping.HasOutputParameters))
{
hasOutputParameters = true;
}
}

Debug.Assert(onResultSet != true, "Unexpected result set found at end");

if (hasOutputParameters)
{
reader.DbDataReader.Close();

var parameterCounter = 0;

// Process all output parameters for the rows in the previous result set (they're not populated until NextResult
// is called on SQL Server)
// TODO: Maybe record if there's any sprocs in the batch, and if not, skip this
while (true)
for (commandIndex = 0; commandIndex < CommandResultSet.Count; commandIndex++)
{
if (!CommandResultSet[commandIndex].HasFlag(ResultSetMapping.HasOutputParameters))
{
if (resultSetMapping.HasFlag(ResultSetMapping.HasOutputParameters))
continue;
}

var command = ModificationCommands[commandIndex];

var rowsAffectedDbParameter = command.RowsAffectedParameter is not null
? reader.DbCommand.Parameters[parameterCounter + command.RowsAffectedParameter.Position]
: command.StoredProcedure!.Return is not null
? reader.DbCommand.Parameters[parameterCounter] // TODO: Assumption that the return value is the 1st parameter.
: null;

if (rowsAffectedDbParameter is not null)
{
if (rowsAffectedDbParameter.Value is int rowsAffected)
{
HandleOutputParameters();
if (rowsAffected != 1)
{
ThrowAggregateUpdateConcurrencyException(
reader, commandIndex + 1, expectedRowsAffected: 1, rowsAffected: 0);
}
}

if (commandIndex == lastHandledCommandIndex)
else
{
break;
throw new Exception(); // TODO
}

resultSetMapping = CommandResultSet[++commandIndex];
command = ModificationCommands[commandIndex];
requiresResultPropagation = command.RequiresResultPropagation;
}
}
else if (resultSetMapping.HasFlag(ResultSetMapping.HasOutputParameters))
{
// Handle output parameters for commands which have no result rows. Commands with result rows are handled above.
HandleOutputParameters();
}

void HandleOutputParameters()
{
if (requiresResultPropagation)
if (command.RequiresResultPropagation)
{
command.PropagateOutputParameters(reader.DbCommand.Parameters);
}
command.PropagateOutputParameters(reader.DbCommand.Parameters, parameterCounter);

// TODO: Rows affected via output parameter
parameterCounter += command.StoredProcedure!.Parameters.Count;
}
}
}

Debug.Assert(onResultSet != true, "Unexpected result set found at end");
}
catch (Exception ex) when (ex is not DbUpdateException and not OperationCanceledException)
{
Expand Down Expand Up @@ -135,12 +153,12 @@ protected override async Task ConsumeAsync(
try
{
bool? onResultSet = null;
var hasOutputParameters = false;

for (; commandIndex < CommandResultSet.Count; commandIndex++)
{
var resultSetMapping = CommandResultSet[commandIndex];
var command = ModificationCommands[commandIndex];
var requiresResultPropagation = command.RequiresResultPropagation;

if (resultSetMapping.HasFlag(ResultSetMapping.HasResultRow))
{
Expand All @@ -149,54 +167,73 @@ protected override async Task ConsumeAsync(
Check.DebugFail("Missing a result set");
}

var lastHandledCommandIndex = requiresResultPropagation
var lastHandledCommandIndex = command.RequiresResultPropagation
? await ConsumeResultSetWithPropagationAsync(commandIndex, reader, cancellationToken).ConfigureAwait(false)
: await ConsumeResultSetWithoutPropagationAsync(commandIndex, reader, cancellationToken).ConfigureAwait(false);

Check.DebugAssert(resultSetMapping.HasFlag(ResultSetMapping.LastInResultSet)
? lastHandledCommandIndex == commandIndex
: lastHandledCommandIndex > commandIndex, "Bad handling of ResultSetMapping and command indexing");

commandIndex = lastHandledCommandIndex;

onResultSet = await reader.DbDataReader.NextResultAsync(cancellationToken).ConfigureAwait(false);
}

if (resultSetMapping.HasFlag(ResultSetMapping.HasOutputParameters))
{
hasOutputParameters = true;
}
}

Debug.Assert(onResultSet != true, "Unexpected result set found at end");

if (hasOutputParameters)
{
await reader.DbDataReader.CloseAsync().ConfigureAwait(false);

var parameterCounter = 0;

// Process all output parameters for the rows in the previous result set (they're not populated until NextResult
// is called on SQL Server)
// TODO: Maybe record if there's any sprocs in the batch, and if not, skip this
while (true)
for (commandIndex = 0; commandIndex < CommandResultSet.Count; commandIndex++)
{
if (!CommandResultSet[commandIndex].HasFlag(ResultSetMapping.HasOutputParameters))
{
if (resultSetMapping.HasFlag(ResultSetMapping.HasOutputParameters))
continue;
}

var command = ModificationCommands[commandIndex];

var rowsAffectedDbParameter = command.RowsAffectedParameter is not null
? reader.DbCommand.Parameters[parameterCounter + command.RowsAffectedParameter.Position]
: command.StoredProcedure!.Return is not null
? reader.DbCommand.Parameters[parameterCounter] // TODO: Assumption that the return value is the 1st parameter.
: null;

if (rowsAffectedDbParameter is not null)
{
if (rowsAffectedDbParameter.Value is int rowsAffected)
{
HandleOutputParameters();
if (rowsAffected != 1)
{
await ThrowAggregateUpdateConcurrencyExceptionAsync(
reader, commandIndex + 1, expectedRowsAffected: 1, rowsAffected: 0, cancellationToken)
.ConfigureAwait(false);
}
}

if (commandIndex == lastHandledCommandIndex)
else
{
break;
throw new Exception(); // TODO
}

resultSetMapping = CommandResultSet[++commandIndex];
command = ModificationCommands[commandIndex];
requiresResultPropagation = command.RequiresResultPropagation;
}
}
else if (resultSetMapping.HasFlag(ResultSetMapping.HasOutputParameters))
{
// Handle output parameters for commands which have no result rows. Commands with result rows are handled above.
HandleOutputParameters();
}

void HandleOutputParameters()
{
if (requiresResultPropagation)
if (command.RequiresResultPropagation)
{
command.PropagateOutputParameters(reader.DbCommand.Parameters);
}
command.PropagateOutputParameters(reader.DbCommand.Parameters, parameterCounter);

// TODO: Rows affected via output parameter
parameterCounter += command.StoredProcedure!.Parameters.Count;
}
}
}

Debug.Assert(onResultSet != true, "Unexpected result set found at end");
}
catch (Exception ex) when (ex is not DbUpdateException and not OperationCanceledException)
{
Expand Down
5 changes: 2 additions & 3 deletions src/EFCore.Relational/Update/ColumnModification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,15 @@ public virtual bool UseOriginalValueParameter

/// <inheritdoc />
public virtual bool UseCurrentValueParameter
=> UseParameter && UseCurrentValue;
=> (UseParameter && UseCurrentValue) || (IsRead && Column is IStoreStoredProcedureParameter or IStoreStoredProcedureReturn);

/// <inheritdoc />
public virtual bool UseOriginalValue
=> IsCondition;

/// <inheritdoc />
public virtual bool UseCurrentValue
// TODO: This may be somewhat hacky, as there's no actual value.
=> IsWrite || (IsRead && Column is IStoreStoredProcedureParameter);
=> IsWrite;

/// <inheritdoc />
public virtual bool UseParameter { get; }
Expand Down
15 changes: 14 additions & 1 deletion src/EFCore.Relational/Update/IReadOnlyModificationCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ public interface IReadOnlyModificationCommand
/// </summary>
public EntityState EntityState { get; }

/// <summary>
/// When using a stored procedure, this optionally points to the output parameter containing the rows affected.
/// </summary>
public IStoreStoredProcedureParameter? RowsAffectedParameter { get; }

/// <summary>
/// When using a stored procedure, this optionally points to the result column containing the rows affected.
/// </summary>
public IStoreStoredProcedureResultColumn? RowsAffectedResultColumn { get; }

/// <summary>
/// Reads result set columns returned from the database in the given <paramref name="relationalReader" /> and propagates them back
/// to into the appropriate <see cref="IColumnModification" /> from which the values can be propagated on to tracked entities.
Expand All @@ -73,5 +83,8 @@ public interface IReadOnlyModificationCommand
/// to into the appropriate <see cref="IColumnModification" /> from which the values can be propagated on to tracked entities.
/// </summary>
/// <param name="parameterCollection">The parameter collection from which to propagate output values.</param>
public void PropagateOutputParameters(DbParameterCollection parameterCollection);
/// <param name="baseParameterIndex">
/// The index in <paramref name="parameterCollection" /> on which parameters for this <see cref="ModificationCommand" /> begin.
/// </param>
public void PropagateOutputParameters(DbParameterCollection parameterCollection, int baseParameterIndex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace Microsoft.EntityFrameworkCore.Update.Internal;
public class CommandBatchPreparer : ICommandBatchPreparer
{
private readonly int _minBatchSize;
private readonly RelationalTypeMapping _intTypeMapping;
private readonly bool _sensitiveLoggingEnabled;
private readonly bool _detailedErrorsEnabled;
private readonly Multigraph<IReadOnlyModificationCommand, IAnnotatable> _modificationCommandGraph;
Expand All @@ -31,6 +32,8 @@ public CommandBatchPreparer(CommandBatchPreparerDependencies dependencies)
dependencies.Options.Extensions.OfType<RelationalOptionsExtension>().FirstOrDefault()?.MinBatchSize
?? 1;

_intTypeMapping = dependencies.RelationalTypeMappingSource.FindMapping(typeof(int))!;

_modificationCommandGraph = new(dependencies.ModificationCommandComparer);
Dependencies = dependencies;

Expand Down Expand Up @@ -200,7 +203,7 @@ protected virtual IEnumerable<IReadOnlyModificationCommand> CreateModificationCo

var command = Dependencies.ModificationCommandFactory.CreateModificationCommand(
new ModificationCommandParameters(
sproc.StoreStoredProcedure, _sensitiveLoggingEnabled, _detailedErrorsEnabled, comparer: null,
sproc.StoreStoredProcedure, _intTypeMapping, _sensitiveLoggingEnabled, _detailedErrorsEnabled, comparer: null,
generateParameterName, Dependencies.UpdateLogger));
command.AddEntry(entry, mainEntry: true);
commands.Add(command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public CommandBatchPreparerDependencies(
IParameterNameGeneratorFactory parameterNameGeneratorFactory,
IComparer<IReadOnlyModificationCommand> modificationCommandComparer,
IModificationCommandFactory modificationCommandFactory,
IRelationalTypeMappingSource relationalTypeMappingSource,
ILoggingOptions loggingOptions,
IDiagnosticsLogger<DbLoggerCategory.Update> updateLogger,
IDbContextOptions options)
Expand All @@ -58,6 +59,7 @@ public CommandBatchPreparerDependencies(
ParameterNameGeneratorFactory = parameterNameGeneratorFactory;
ModificationCommandComparer = modificationCommandComparer;
ModificationCommandFactory = modificationCommandFactory;
RelationalTypeMappingSource = relationalTypeMappingSource;
LoggingOptions = loggingOptions;
UpdateLogger = updateLogger;
Options = options;
Expand Down Expand Up @@ -95,6 +97,14 @@ public CommandBatchPreparerDependencies(
/// </summary>
public IModificationCommandFactory ModificationCommandFactory { get; init; }

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public IRelationalTypeMappingSource RelationalTypeMappingSource { get; init; }

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
Expand Down
Loading

0 comments on commit 8c6658c

Please sign in to comment.