From 71aad34357f9767bc5029dcc76f616144f403b4f Mon Sep 17 00:00:00 2001 From: Steven Schveighoffer Date: Mon, 22 May 2023 17:52:51 -0400 Subject: [PATCH] Attempt to detect when the server connection becomes stale. --- source/mysql/exceptions.d | 33 ++++++++ source/mysql/impl/connection.d | 12 +-- source/mysql/protocol/comms.d | 149 +++++++++++++++++++++------------ 3 files changed, 135 insertions(+), 59 deletions(-) diff --git a/source/mysql/exceptions.d b/source/mysql/exceptions.d index 64ec0bce..5803a0a9 100644 --- a/source/mysql/exceptions.d +++ b/source/mysql/exceptions.d @@ -156,3 +156,36 @@ class MYXInvalidatedRange: MYX super(msg, file, line); } } + +/++ +Thrown when a stale connection to the server is detected. + +To properly use this, it is suggested to use the following construct: + +---- +retry: +try { + conn.exec(...); // do the command + // or prepare, query, etc. +} +catch(MYXStaleConnection) +{ + goto retry; +} +---- + +In the future, when the protocol code is rewritten, this may become a built-in +feature so the user does not have to do this on their own. + +NOTE: this is a new mechanism to try and capture this case so user code can +properly handle the retry. Any bugs in this happening as an infinite loop, +please file an issue with the exact case. ++/ +class MYXStaleConnection: MYX +{ +@safe pure: + this(string msg, string file = __FILE__, size_t line = __LINE__) + { + super(msg, file, line); + } +} diff --git a/source/mysql/impl/connection.d b/source/mysql/impl/connection.d index f9dc33b6..b648372a 100644 --- a/source/mysql/impl/connection.d +++ b/source/mysql/impl/connection.d @@ -631,8 +631,8 @@ public: +/ void selectDB(string dbName) { - this.sendCmd(CommandType.INIT_DB, dbName); - this.getCmdResponse(); + auto packet = this.sendCmd(CommandType.INIT_DB, dbName); + this.getCmdResponse(packet); _db = dbName; } @@ -645,8 +645,8 @@ public: +/ OKErrorPacket pingServer() { - this.sendCmd(CommandType.PING, []); - return this.getCmdResponse(); + auto packet = this.sendCmd(CommandType.PING, []); + return this.getCmdResponse(packet); } /++ @@ -658,8 +658,8 @@ public: +/ OKErrorPacket refreshServer(RefreshFlags flags) { - this.sendCmd(CommandType.REFRESH, [flags]); - return this.getCmdResponse(); + auto packet = this.sendCmd(CommandType.REFRESH, [flags]); + return this.getCmdResponse(packet); } /++ diff --git a/source/mysql/protocol/comms.d b/source/mysql/protocol/comms.d index 3d37d88a..ec2a3ea2 100644 --- a/source/mysql/protocol/comms.d +++ b/source/mysql/protocol/comms.d @@ -391,36 +391,60 @@ package struct ProtocolPrepared } } - static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh, + static ubyte[] sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh, MySQLVal[] inParams, ParameterSpecialization[] psa) { - conn.autoPurge(); + ubyte[] impl() + { + conn.autoPurge(); - ubyte[] packet; - conn.resetPacket(); + ubyte[] packet; + conn.resetPacket(); + + ubyte[] prefix = makePSPrefix(hStmt, 0); + size_t len = prefix.length; + bool longData; + + if (psh.paramCount) + { + ubyte[] one = [ 1 ]; + ubyte[] vals; + ubyte[] types = analyseParams(inParams, psa, vals, longData); + ubyte[] nbm = makeBitmap(inParams); + packet = prefix ~ nbm ~ one ~ types ~ vals; + } + else + packet = prefix; - ubyte[] prefix = makePSPrefix(hStmt, 0); - size_t len = prefix.length; - bool longData; + if (longData) + sendLongData(conn._socket, hStmt, psa); - if (psh.paramCount) + assert(packet.length <= uint.max); + packet.setPacketHeader(conn.pktNumber); + conn.bumpPacket(); + conn._socket.send(packet); + + return conn.getPacket(); + } + + if(conn._socket.connected) { - ubyte[] one = [ 1 ]; - ubyte[] vals; - ubyte[] types = analyseParams(inParams, psa, vals, longData); - ubyte[] nbm = makeBitmap(inParams); - packet = prefix ~ nbm ~ one ~ types ~ vals; + try + { + return impl(); + } + catch(Exception) + { + // convert all exceptions here as stale connections. This might be the + // first try to send a request to the server on an + // already-existing connection. + throw new MYXStaleConnection("Possible stale connection"); + } } else - packet = prefix; - - if (longData) - sendLongData(conn._socket, hStmt, psa); - - assert(packet.length <= uint.max); - packet.setPacketHeader(conn.pktNumber); - conn.bumpPacket(); - conn._socket.send(packet); + { + return impl(); + } } } @@ -454,22 +478,21 @@ package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out u scope(failure) conn.kill(); // Send data + ubyte[] packet; if(info.isPrepared) { logTrace("prepared SQL: %s", info.hStmt); - ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa); + packet = ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa); } else { logTrace("exec query: %s", info.sql); - conn.sendCmd(CommandType.QUERY, info.sql); + packet = conn.sendCmd(CommandType.QUERY, info.sql); conn._fieldCount = 0; } - // Handle response - ubyte[] packet = conn.getPacket(); bool rv; if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error) { @@ -688,7 +711,8 @@ do _socket.write(data); } -package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data) +// returns the first packet received +package(mysql) ubyte[] sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data) in { // Internal thread states. Clients shouldn't use this @@ -709,39 +733,62 @@ in out { // at this point we should have sent a command - assert(conn.pktNumber == 1); + //assert(conn.pktNumber == 1); } do { - scope(failure) conn.kill(); - conn._lastCommandID++; - if(!conn._socket.connected) + ubyte[] impl() { - if(cmd == CommandType.QUIT) - return; // Don't bother reopening connection just to quit + scope(failure) conn.kill(); - conn._open = Connection.OpenState.notConnected; - conn.connect(conn._clientCapabilities); - } + conn.autoPurge(); - conn.autoPurge(); + conn.resetPacket(); - conn.resetPacket(); + ubyte[] header; + header.length = 4 /*header*/ + 1 /*cmd*/; + header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/); + header[4] = cmd; + conn.bumpPacket(); - ubyte[] header; - header.length = 4 /*header*/ + 1 /*cmd*/; - header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/); - header[4] = cmd; - conn.bumpPacket(); + conn._socket.send(header, cast(const(ubyte)[])data); + + // now, get the first packet, but only if the command is not QUIT + if(cmd == CommandType.QUIT) + return null; + + return conn.getPacket(); + } + + if(conn._socket.connected) + { + try + { + return impl(); + } + catch(Exception) + { + // convert all exceptions here as stale connections. This is the + // first try to send a request to the server on an already-existing connection. + throw new MYXStaleConnection("Possible stale connection"); + } + } + else + { + if(cmd == CommandType.QUIT) + return null; // Don't bother reopening connection just to quit - conn._socket.send(header, cast(const(ubyte)[])data); + conn._open = Connection.OpenState.notConnected; + conn.connect(conn._clientCapabilities); + return impl(); + } } -package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false) +package(mysql) OKErrorPacket getCmdResponse(Connection conn, ubyte[] packet) { - auto okp = OKErrorPacket(conn.getPacket()); + auto okp = OKErrorPacket(packet); enforcePacketOK(okp); conn._serverStatus = okp.serverStatus; return okp; @@ -960,10 +1007,9 @@ package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[]) PreparedServerInfo info; - conn.sendCmd(CommandType.STMT_PREPARE, sql); + ubyte[] packet = conn.sendCmd(CommandType.STMT_PREPARE, sql); conn._fieldCount = 0; - ubyte[] packet = conn.getPacket(); if(packet.front == ResultPacketMarker.ok) { packet.popFront(); @@ -1046,8 +1092,7 @@ Get a textual report on the server status. +/ package(mysql) string serverStats(Connection conn) { - conn.sendCmd(CommandType.STATISTICS, []); - auto result = conn.getPacket(); + auto result = conn.sendCmd(CommandType.STATISTICS, []); return (() @trusted => cast(string)result)(); } @@ -1071,10 +1116,8 @@ package(mysql) void enableMultiStatements(Connection conn, bool on) t.length = 2; t[0] = on ? 0 : 1; t[1] = 0; - conn.sendCmd(CommandType.STMT_OPTION, t); - // For some reason this command gets an EOF packet as response - auto packet = conn.getPacket(); + auto packet = conn.sendCmd(CommandType.STMT_OPTION, t); enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); }