Skip to content

Commit

Permalink
Merge pull request #637 from mysql-net/batch-commands
Browse files Browse the repository at this point in the history
Add batch command API.
  • Loading branch information
bgrainger authored Jun 23, 2019
2 parents d6cafdd + 0d6df3c commit 1ddaa43
Show file tree
Hide file tree
Showing 36 changed files with 1,831 additions and 463 deletions.
94 changes: 94 additions & 0 deletions docs/content/api/mysql-batch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
---
date: 2019-06-23
menu:
main:
parent: api
title: MySqlBatch
weight: 10
---

# MySqlBatch

`MySqlBatch` implements the new [ADO.NET batching API](https://github.com/dotnet/corefx/issues/35135).
**It is currently experimental** and may change in the future.

When using MariaDB (10.2 or later), the commands will be sent in a single batch, reducing network
round-trip time. With other MySQL Servers, this may be no more efficient than executing the commands
individually.

## Example Code

```csharp
using (var connection = new MySqlConnection("...connection string..."))
{
await connection.OpenAsync();
using (var batch = new MySqlBatch(connection)
{
BatchCommands =
{
new MySqlBatchCommand("INSERT INTO departments(name) VALUES(@name);")
{
Parameters =
{
new MySqlParameter("@name", "Sales"),
},
},
new MySqlBatchCommand("SET @dept_id = last_insert_id()"),
new MySqlBatchCommand("INSERT INTO employees(name, department_id) VALUES(@name, @dept_id);")
{
Parameters =
{
new MySqlParameter("@name", "Jim Halpert"),
},
},
new MySqlBatchCommand("INSERT INTO employees(name, department_id) VALUES(@name, @dept_id);")
{
Parameters =
{
new MySqlParameter("@name", "Dwight Schrute"),
},
},
},
})
{
await batch.ExecuteNonQueryAsync();
}
}
```

## API Reference

### Constructors
`public MySqlBatch()`

Parameterless constructor.
***
`public MySqlBatch(MySqlConnection connection)`

Constructor that accepts a `MySqlConnection` and sets the `Connection` property.

### Properties

`public MySqlBatchCommandCollection BatchCommands { get; }`

The collection of commands that will be executed in the batch.

### Methods

`public void ExecuteNonQuery();`
`public Task ExecuteNonQueryAsync();`

Executes all the commands in the batch, returning nothing.
***

`public object ExecuteScalar();`
`public Task<object> ExecuteScalarAsync();`

Executes all the commands in the batch, returning the value from the first column in the first row of the first resultset.
***

`public MySqlDataReader ExecuteReader();`
`public Task<DbDataReader> ExecuteReaderAsync();`

Executes all the commands in the batch, return a `DbDataReader` that can iterate over the result sets. If multiple
resultsets are returned, use `DbDataReader.NextResult` (or `NextResultAsync`) to access them.
63 changes: 37 additions & 26 deletions docs/content/api/mysql-connection.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
---
lastmod: 2017-11-06
lastmod: 2019-06-23
date: 2016-10-16
menu:
main:
parent: api
title: MySqlConnection
weight: 10
weight: 30
---

MySqlConnection
Expand All @@ -17,50 +17,61 @@ please refer to its documentation.
Additionally, MySqlConnection provides the following public properties and methods that may be used:

### Constructors
`public MySqlConnection()`
`MySqlConnection()`

Parameterless constructor
Parameterless constructor.
***
`public MySqlConnection(string connectionString)`
`MySqlConnection(string connectionString)`

Constructor that sets the `ConnectionString` property.

Constructor that set the connection string
***
### Additional Properties
`public int ServerThread`
`int ServerThread`

Connection ID from MySQL Server
Connection ID from MySQL Server.
***
`bool CanCreateBatch`

Returns `true`.

### Additional Instance Methods
`public Task<MySqlTransaction> BeginTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))`
`Task<MySqlTransaction> BeginTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))`

Async version of BeginTransaction
Async version of `BeginTransaction`.
***
`public Task<MySqlTransaction> BeginTransactionAsync(IsolationLevel isolationLevel, CancellationToken cancellationToken = default(CancellationToken))`
`Task<MySqlTransaction> BeginTransactionAsync(IsolationLevel isolationLevel, CancellationToken cancellationToken = default(CancellationToken))`

Async version of BeginTransaction that supports setting Isolation Level
Async version of `BeginTransaction` that supports setting Isolation Level.
***
### Additional Static Methods
`public static void ClearPool(MySqlConnection connection)`
`MySqlBatch CreateBatch()`

Clears the connection pool that the connection belongs to
Creates a `MySqlBatch` object for executing batched commands.
***
`public static Task ClearPoolAsync(MySqlConnection connection)`
`MySqlBatchCommand CreateBatchCommand()`

Async version of ClearPool
Creates a `MySqlBatchCommand` object (that can be used with `MySqlBatch.BatchCommands`).

### Additional Static Methods
`static void ClearPool(MySqlConnection connection)`

Clears the connection pool that the connection belongs to.
***
`public static Task ClearPoolAsync(MySqlConnection connection, CancellationToken cancellationToken)`
`static Task ClearPoolAsync(MySqlConnection connection)`

Async version of ClearPool with cancellation token support
Async version of `ClearPool`.
***
`public static void ClearAllPools()`
`static Task ClearPoolAsync(MySqlConnection connection, CancellationToken cancellationToken)`

Clears all connection pools in the entire application
Async version of `ClearPool` with cancellation token support.
***
`public static Task ClearAllPoolsAsync()`
`static void ClearAllPools()`

Async version of ClearAllPoolsAsync
Clears all connection pools in the entire application.
***
`public static Task ClearAllPoolsAsync(CancellationToken cancellationToken)`
`static Task ClearAllPoolsAsync()`

Async version of ClearAllPoolsAsync with cancellation token support
Async version of `ClearAllPoolsAsync`.
***
`static Task ClearAllPoolsAsync(CancellationToken cancellationToken)`

Async version of `ClearAllPoolsAsync` with cancellation token support.
2 changes: 1 addition & 1 deletion docs/content/api/mysql-data-reader.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ menu:
main:
parent: api
title: MySqlDataReader
weight: 30
weight: 40
---

MySqlDataReader
Expand Down
2 changes: 1 addition & 1 deletion docs/content/api/mysql-transaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ menu:
main:
parent: api
title: MySqlTransaction
weight: 40
weight: 50
---

MySqlTransaction
Expand Down
1 change: 1 addition & 0 deletions docs/content/tutorials/migrating-from-connector-net.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ for various precondition checks that indicate misuse of the API (and not a probl

The following bugs in Connector/NET are fixed by switching to MySqlConnector. (~~Strikethrough~~ indicates bugs that have since been fixed in a newer version of Connector/NET, but were fixed first in MySqlConnector.)

* [#14115](https://bugs.mysql.com/bug.php?id=14115): Compound statements are not supported by `MySqlCommand.Prepare`
* [#37283](https://bugs.mysql.com/bug.php?id=37283), [#70587](https://bugs.mysql.com/bug.php?id=70587): Distributed transactions are not supported
* [#50773](https://bugs.mysql.com/bug.php?id=50773): Can’t use multiple connections within one TransactionScope
* [#61477](https://bugs.mysql.com/bug.php?id=61477): `ColumnOrdinal` in schema table is 1-based
Expand Down
42 changes: 42 additions & 0 deletions src/MySqlConnector/Core/BatchedCommandPayloadCreator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using System.Buffers.Binary;
using System.Collections.Generic;
using MySqlConnector.Protocol;
using MySqlConnector.Protocol.Serialization;

namespace MySqlConnector.Core
{
internal sealed class BatchedCommandPayloadCreator : ICommandPayloadCreator
{
public static ICommandPayloadCreator Instance { get; } = new BatchedCommandPayloadCreator();

public bool WriteQueryCommand(ref CommandListPosition commandListPosition, IDictionary<string, CachedProcedure> cachedProcedures, ByteBufferWriter writer)
{
writer.Write((byte) CommandKind.Multi);
bool? firstResult = default;
bool wroteCommand;
do
{
// save room for command length
var position = writer.Position;
writer.Write(Padding);

wroteCommand = SingleCommandPayloadCreator.Instance.WriteQueryCommand(ref commandListPosition, cachedProcedures, writer);
if (firstResult is null)
firstResult = wroteCommand;

// write command length
var commandLength = writer.Position - position - Padding.Length;
var span = writer.ArraySegment.AsSpan().Slice(position);
span[0] = 0xFE;
BinaryPrimitives.WriteUInt64LittleEndian(span.Slice(1), (ulong) commandLength);
} while (wroteCommand);

// remove the padding that was saved for the final command (which wasn't written)
writer.TrimEnd(Padding.Length);
return firstResult.Value;
}

static ReadOnlySpan<byte> Padding => new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 };
}
}
71 changes: 71 additions & 0 deletions src/MySqlConnector/Core/CommandExecutor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using MySql.Data.MySqlClient;
using MySqlConnector.Logging;
using MySqlConnector.Protocol.Serialization;
using MySqlConnector.Utilities;

namespace MySqlConnector.Core
{
internal static class CommandExecutor
{
public static async Task<DbDataReader> ExecuteReaderAsync(IReadOnlyList<IMySqlCommand> commands, ICommandPayloadCreator payloadCreator, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
var commandListPosition = new CommandListPosition(commands);
var command = commands[0];
if (Log.IsDebugEnabled())
Log.Debug("Session{0} ExecuteReader {1} CommandCount: {2}", command.Connection.Session.Id, ioBehavior, commands.Count);

Dictionary<string, CachedProcedure> cachedProcedures = null;
foreach (var command2 in commands)
{
if (command2.CommandType == CommandType.StoredProcedure)
{
if (cachedProcedures is null)
cachedProcedures = new Dictionary<string, CachedProcedure>();
if (!cachedProcedures.ContainsKey(command2.CommandText))
cachedProcedures.Add(command2.CommandText, await command2.Connection.GetCachedProcedure(ioBehavior, command2.CommandText, cancellationToken).ConfigureAwait(false));
}
}

var writer = new ByteBufferWriter();
if (!payloadCreator.WriteQueryCommand(ref commandListPosition, cachedProcedures, writer))
throw new InvalidOperationException("ICommandPayloadCreator failed to write query payload");

cancellationToken.ThrowIfCancellationRequested();

using (var payload = writer.ToPayloadData())
using (command.CancellableCommand.RegisterCancel(cancellationToken))
{
command.Connection.Session.StartQuerying(command.CancellableCommand);
command.SetLastInsertedId(-1);
try
{
await command.Connection.Session.SendAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
return await MySqlDataReader.CreateAsync(commandListPosition, payloadCreator, cachedProcedures, command, behavior, ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted && cancellationToken.IsCancellationRequested)
{
Log.Warn("Session{0} query was interrupted", command.Connection.Session.Id);
throw new OperationCanceledException(cancellationToken);
}
catch (Exception ex) when (payload.ArraySegment.Count > 4_194_304 && (ex is SocketException || ex is IOException || ex is MySqlProtocolException))
{
// the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
// use "decimal megabytes" (to round up) when creating the exception message
int megabytes = payload.ArraySegment.Count / 1_000_000;
throw new MySqlException("Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB.".FormatInvariant(megabytes), ex);
}
}
}

static readonly IMySqlConnectorLogger Log = MySqlConnectorLogManager.CreateLogger(nameof(CommandExecutor));
}
}
32 changes: 32 additions & 0 deletions src/MySqlConnector/Core/CommandListPosition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System.Collections.Generic;

namespace MySqlConnector.Core
{
/// <summary>
/// <see cref="CommandListPosition"/> encapsulates a list of <see cref="IMySqlCommand"/> and the current position within that list.
/// </summary>
internal struct CommandListPosition
{
public CommandListPosition(IReadOnlyList<IMySqlCommand> commands)
{
Commands = commands;
CommandIndex = 0;
PreparedStatementIndex = 0;
}

/// <summary>
/// The commands in the list.
/// </summary>
public IReadOnlyList<IMySqlCommand> Commands { get; }

/// <summary>
/// The index of the current command.
/// </summary>
public int CommandIndex;

/// <summary>
/// If the current command is a prepared statement, the index of the current prepared statement for that command.
/// </summary>
public int PreparedStatementIndex;
}
}
Loading

0 comments on commit 1ddaa43

Please sign in to comment.