Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to detect when the server connection becomes stale. #278

Merged
merged 1 commit into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions source/mysql/exceptions.d
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,36 @@ class MYXInvalidatedRange: MYX
super(msg, file, line);
}
}

/++
Thrown when a stale connection to the server is detected.

To properly use this, it is suggested to use the following construct:

----
retry:
try {
conn.exec(...); // do the command
// or prepare, query, etc.
}
catch(MYXStaleConnection)
{
goto retry;
}
----

In the future, when the protocol code is rewritten, this may become a built-in
feature so the user does not have to do this on their own.

NOTE: this is a new mechanism to try and capture this case so user code can
properly handle the retry. Any bugs in this happening as an infinite loop,
please file an issue with the exact case.
+/
class MYXStaleConnection: MYX
{
@safe pure:
this(string msg, string file = __FILE__, size_t line = __LINE__)
{
super(msg, file, line);
}
}
12 changes: 6 additions & 6 deletions source/mysql/impl/connection.d
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,8 @@ public:
+/
void selectDB(string dbName)
{
this.sendCmd(CommandType.INIT_DB, dbName);
this.getCmdResponse();
auto packet = this.sendCmd(CommandType.INIT_DB, dbName);
this.getCmdResponse(packet);
_db = dbName;
}

Expand All @@ -645,8 +645,8 @@ public:
+/
OKErrorPacket pingServer()
{
this.sendCmd(CommandType.PING, []);
return this.getCmdResponse();
auto packet = this.sendCmd(CommandType.PING, []);
return this.getCmdResponse(packet);
}

/++
Expand All @@ -658,8 +658,8 @@ public:
+/
OKErrorPacket refreshServer(RefreshFlags flags)
{
this.sendCmd(CommandType.REFRESH, [flags]);
return this.getCmdResponse();
auto packet = this.sendCmd(CommandType.REFRESH, [flags]);
return this.getCmdResponse(packet);
}

/++
Expand Down
149 changes: 96 additions & 53 deletions source/mysql/protocol/comms.d
Original file line number Diff line number Diff line change
Expand Up @@ -391,36 +391,60 @@ package struct ProtocolPrepared
}
}

static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
static ubyte[] sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
MySQLVal[] inParams, ParameterSpecialization[] psa)
{
conn.autoPurge();
ubyte[] impl()
{
conn.autoPurge();

ubyte[] packet;
conn.resetPacket();
ubyte[] packet;
conn.resetPacket();

ubyte[] prefix = makePSPrefix(hStmt, 0);
size_t len = prefix.length;
bool longData;

if (psh.paramCount)
{
ubyte[] one = [ 1 ];
ubyte[] vals;
ubyte[] types = analyseParams(inParams, psa, vals, longData);
ubyte[] nbm = makeBitmap(inParams);
packet = prefix ~ nbm ~ one ~ types ~ vals;
}
else
packet = prefix;

ubyte[] prefix = makePSPrefix(hStmt, 0);
size_t len = prefix.length;
bool longData;
if (longData)
sendLongData(conn._socket, hStmt, psa);

if (psh.paramCount)
assert(packet.length <= uint.max);
packet.setPacketHeader(conn.pktNumber);
conn.bumpPacket();
conn._socket.send(packet);

return conn.getPacket();
}

if(conn._socket.connected)
{
ubyte[] one = [ 1 ];
ubyte[] vals;
ubyte[] types = analyseParams(inParams, psa, vals, longData);
ubyte[] nbm = makeBitmap(inParams);
packet = prefix ~ nbm ~ one ~ types ~ vals;
try
{
return impl();
}
catch(Exception)
{
// convert all exceptions here as stale connections. This might be the
// first try to send a request to the server on an
// already-existing connection.
throw new MYXStaleConnection("Possible stale connection");
}
}
else
packet = prefix;

if (longData)
sendLongData(conn._socket, hStmt, psa);

assert(packet.length <= uint.max);
packet.setPacketHeader(conn.pktNumber);
conn.bumpPacket();
conn._socket.send(packet);
{
return impl();
}
}
}

Expand Down Expand Up @@ -454,22 +478,21 @@ package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out u
scope(failure) conn.kill();

// Send data
ubyte[] packet;
if(info.isPrepared)
{
logTrace("prepared SQL: %s", info.hStmt);

ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa);
packet = ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa);
}
else
{
logTrace("exec query: %s", info.sql);

conn.sendCmd(CommandType.QUERY, info.sql);
packet = conn.sendCmd(CommandType.QUERY, info.sql);
conn._fieldCount = 0;
}

// Handle response
ubyte[] packet = conn.getPacket();
bool rv;
if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error)
{
Expand Down Expand Up @@ -688,7 +711,8 @@ do
_socket.write(data);
}

package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data)
// returns the first packet received
package(mysql) ubyte[] sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data)
in
{
// Internal thread states. Clients shouldn't use this
Expand All @@ -709,39 +733,62 @@ in
out
{
// at this point we should have sent a command
assert(conn.pktNumber == 1);
//assert(conn.pktNumber == 1);
}
do
{
scope(failure) conn.kill();

conn._lastCommandID++;

if(!conn._socket.connected)
ubyte[] impl()
{
if(cmd == CommandType.QUIT)
return; // Don't bother reopening connection just to quit
scope(failure) conn.kill();

conn._open = Connection.OpenState.notConnected;
conn.connect(conn._clientCapabilities);
}
conn.autoPurge();

conn.autoPurge();
conn.resetPacket();

conn.resetPacket();
ubyte[] header;
header.length = 4 /*header*/ + 1 /*cmd*/;
header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/);
header[4] = cmd;
conn.bumpPacket();

ubyte[] header;
header.length = 4 /*header*/ + 1 /*cmd*/;
header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/);
header[4] = cmd;
conn.bumpPacket();
conn._socket.send(header, cast(const(ubyte)[])data);

// now, get the first packet, but only if the command is not QUIT
if(cmd == CommandType.QUIT)
return null;

return conn.getPacket();
}

if(conn._socket.connected)
{
try
{
return impl();
}
catch(Exception)
{
// convert all exceptions here as stale connections. This is the
// first try to send a request to the server on an already-existing connection.
throw new MYXStaleConnection("Possible stale connection");
}
}
else
{
if(cmd == CommandType.QUIT)
return null; // Don't bother reopening connection just to quit

conn._socket.send(header, cast(const(ubyte)[])data);
conn._open = Connection.OpenState.notConnected;
conn.connect(conn._clientCapabilities);
return impl();
}
}

package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false)
package(mysql) OKErrorPacket getCmdResponse(Connection conn, ubyte[] packet)
{
auto okp = OKErrorPacket(conn.getPacket());
auto okp = OKErrorPacket(packet);
enforcePacketOK(okp);
conn._serverStatus = okp.serverStatus;
return okp;
Expand Down Expand Up @@ -960,10 +1007,9 @@ package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[])

PreparedServerInfo info;

conn.sendCmd(CommandType.STMT_PREPARE, sql);
ubyte[] packet = conn.sendCmd(CommandType.STMT_PREPARE, sql);
conn._fieldCount = 0;

ubyte[] packet = conn.getPacket();
if(packet.front == ResultPacketMarker.ok)
{
packet.popFront();
Expand Down Expand Up @@ -1046,8 +1092,7 @@ Get a textual report on the server status.
+/
package(mysql) string serverStats(Connection conn)
{
conn.sendCmd(CommandType.STATISTICS, []);
auto result = conn.getPacket();
auto result = conn.sendCmd(CommandType.STATISTICS, []);
return (() @trusted => cast(string)result)();
}

Expand All @@ -1071,10 +1116,8 @@ package(mysql) void enableMultiStatements(Connection conn, bool on)
t.length = 2;
t[0] = on ? 0 : 1;
t[1] = 0;
conn.sendCmd(CommandType.STMT_OPTION, t);

// For some reason this command gets an EOF packet as response
auto packet = conn.getPacket();
auto packet = conn.sendCmd(CommandType.STMT_OPTION, t);
enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
}

Expand Down