Skip to content

Commit

Permalink
Merge pull request #278 from schveiguy/fixserverdrop
Browse files Browse the repository at this point in the history
Attempt to detect when the server connection becomes stale.
  • Loading branch information
schveiguy authored May 22, 2023
2 parents 1bcf0bc + 71aad34 commit 57fbc2b
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 59 deletions.
33 changes: 33 additions & 0 deletions source/mysql/exceptions.d
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,36 @@ class MYXInvalidatedRange: MYX
super(msg, file, line);
}
}

/++
Thrown when a stale connection to the server is detected.
To properly use this, it is suggested to use the following construct:
----
retry:
try {
conn.exec(...); // do the command
// or prepare, query, etc.
}
catch(MYXStaleConnection)
{
goto retry;
}
----
In the future, when the protocol code is rewritten, this may become a built-in
feature so the user does not have to do this on their own.
NOTE: this is a new mechanism to try and capture this case so user code can
properly handle the retry. Any bugs in this happening as an infinite loop,
please file an issue with the exact case.
+/
class MYXStaleConnection: MYX
{
@safe pure:
this(string msg, string file = __FILE__, size_t line = __LINE__)
{
super(msg, file, line);
}
}
12 changes: 6 additions & 6 deletions source/mysql/impl/connection.d
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,8 @@ public:
+/
void selectDB(string dbName)
{
this.sendCmd(CommandType.INIT_DB, dbName);
this.getCmdResponse();
auto packet = this.sendCmd(CommandType.INIT_DB, dbName);
this.getCmdResponse(packet);
_db = dbName;
}

Expand All @@ -645,8 +645,8 @@ public:
+/
OKErrorPacket pingServer()
{
this.sendCmd(CommandType.PING, []);
return this.getCmdResponse();
auto packet = this.sendCmd(CommandType.PING, []);
return this.getCmdResponse(packet);
}

/++
Expand All @@ -658,8 +658,8 @@ public:
+/
OKErrorPacket refreshServer(RefreshFlags flags)
{
this.sendCmd(CommandType.REFRESH, [flags]);
return this.getCmdResponse();
auto packet = this.sendCmd(CommandType.REFRESH, [flags]);
return this.getCmdResponse(packet);
}

/++
Expand Down
149 changes: 96 additions & 53 deletions source/mysql/protocol/comms.d
Original file line number Diff line number Diff line change
Expand Up @@ -391,36 +391,60 @@ package struct ProtocolPrepared
}
}

static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
static ubyte[] sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
MySQLVal[] inParams, ParameterSpecialization[] psa)
{
conn.autoPurge();
ubyte[] impl()
{
conn.autoPurge();

ubyte[] packet;
conn.resetPacket();
ubyte[] packet;
conn.resetPacket();

ubyte[] prefix = makePSPrefix(hStmt, 0);
size_t len = prefix.length;
bool longData;

if (psh.paramCount)
{
ubyte[] one = [ 1 ];
ubyte[] vals;
ubyte[] types = analyseParams(inParams, psa, vals, longData);
ubyte[] nbm = makeBitmap(inParams);
packet = prefix ~ nbm ~ one ~ types ~ vals;
}
else
packet = prefix;

ubyte[] prefix = makePSPrefix(hStmt, 0);
size_t len = prefix.length;
bool longData;
if (longData)
sendLongData(conn._socket, hStmt, psa);

if (psh.paramCount)
assert(packet.length <= uint.max);
packet.setPacketHeader(conn.pktNumber);
conn.bumpPacket();
conn._socket.send(packet);

return conn.getPacket();
}

if(conn._socket.connected)
{
ubyte[] one = [ 1 ];
ubyte[] vals;
ubyte[] types = analyseParams(inParams, psa, vals, longData);
ubyte[] nbm = makeBitmap(inParams);
packet = prefix ~ nbm ~ one ~ types ~ vals;
try
{
return impl();
}
catch(Exception)
{
// convert all exceptions here as stale connections. This might be the
// first try to send a request to the server on an
// already-existing connection.
throw new MYXStaleConnection("Possible stale connection");
}
}
else
packet = prefix;

if (longData)
sendLongData(conn._socket, hStmt, psa);

assert(packet.length <= uint.max);
packet.setPacketHeader(conn.pktNumber);
conn.bumpPacket();
conn._socket.send(packet);
{
return impl();
}
}
}

Expand Down Expand Up @@ -454,22 +478,21 @@ package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out u
scope(failure) conn.kill();

// Send data
ubyte[] packet;
if(info.isPrepared)
{
logTrace("prepared SQL: %s", info.hStmt);

ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa);
packet = ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa);
}
else
{
logTrace("exec query: %s", info.sql);

conn.sendCmd(CommandType.QUERY, info.sql);
packet = conn.sendCmd(CommandType.QUERY, info.sql);
conn._fieldCount = 0;
}

// Handle response
ubyte[] packet = conn.getPacket();
bool rv;
if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error)
{
Expand Down Expand Up @@ -688,7 +711,8 @@ do
_socket.write(data);
}

package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data)
// returns the first packet received
package(mysql) ubyte[] sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data)
in
{
// Internal thread states. Clients shouldn't use this
Expand All @@ -709,39 +733,62 @@ in
out
{
// at this point we should have sent a command
assert(conn.pktNumber == 1);
//assert(conn.pktNumber == 1);
}
do
{
scope(failure) conn.kill();

conn._lastCommandID++;

if(!conn._socket.connected)
ubyte[] impl()
{
if(cmd == CommandType.QUIT)
return; // Don't bother reopening connection just to quit
scope(failure) conn.kill();

conn._open = Connection.OpenState.notConnected;
conn.connect(conn._clientCapabilities);
}
conn.autoPurge();

conn.autoPurge();
conn.resetPacket();

conn.resetPacket();
ubyte[] header;
header.length = 4 /*header*/ + 1 /*cmd*/;
header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/);
header[4] = cmd;
conn.bumpPacket();

ubyte[] header;
header.length = 4 /*header*/ + 1 /*cmd*/;
header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/);
header[4] = cmd;
conn.bumpPacket();
conn._socket.send(header, cast(const(ubyte)[])data);

// now, get the first packet, but only if the command is not QUIT
if(cmd == CommandType.QUIT)
return null;

return conn.getPacket();
}

if(conn._socket.connected)
{
try
{
return impl();
}
catch(Exception)
{
// convert all exceptions here as stale connections. This is the
// first try to send a request to the server on an already-existing connection.
throw new MYXStaleConnection("Possible stale connection");
}
}
else
{
if(cmd == CommandType.QUIT)
return null; // Don't bother reopening connection just to quit

conn._socket.send(header, cast(const(ubyte)[])data);
conn._open = Connection.OpenState.notConnected;
conn.connect(conn._clientCapabilities);
return impl();
}
}

package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false)
package(mysql) OKErrorPacket getCmdResponse(Connection conn, ubyte[] packet)
{
auto okp = OKErrorPacket(conn.getPacket());
auto okp = OKErrorPacket(packet);
enforcePacketOK(okp);
conn._serverStatus = okp.serverStatus;
return okp;
Expand Down Expand Up @@ -960,10 +1007,9 @@ package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[])

PreparedServerInfo info;

conn.sendCmd(CommandType.STMT_PREPARE, sql);
ubyte[] packet = conn.sendCmd(CommandType.STMT_PREPARE, sql);
conn._fieldCount = 0;

ubyte[] packet = conn.getPacket();
if(packet.front == ResultPacketMarker.ok)
{
packet.popFront();
Expand Down Expand Up @@ -1046,8 +1092,7 @@ Get a textual report on the server status.
+/
package(mysql) string serverStats(Connection conn)
{
conn.sendCmd(CommandType.STATISTICS, []);
auto result = conn.getPacket();
auto result = conn.sendCmd(CommandType.STATISTICS, []);
return (() @trusted => cast(string)result)();
}

Expand All @@ -1071,10 +1116,8 @@ package(mysql) void enableMultiStatements(Connection conn, bool on)
t.length = 2;
t[0] = on ? 0 : 1;
t[1] = 0;
conn.sendCmd(CommandType.STMT_OPTION, t);

// For some reason this command gets an EOF packet as response
auto packet = conn.getPacket();
auto packet = conn.sendCmd(CommandType.STMT_OPTION, t);
enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
}

Expand Down

0 comments on commit 57fbc2b

Please sign in to comment.