Skip to content

Commit

Permalink
Fix issues/FR #2120 , #2121 and #2125
Browse files Browse the repository at this point in the history
Issue #2120 : Send SESSION_TRACK_GTIDS to client
Issue #2121 : Track CLIENT_FOUND_ROWS required by the client
Issue #2125 : Track CLIENT_MULTI_STATEMENTS required by the client
  • Loading branch information
renecannao committed Jul 19, 2019
1 parent 334cbdb commit 6691a86
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 26 deletions.
4 changes: 4 additions & 0 deletions include/MySQL_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ class MySQL_Session

bool with_gtid;

char gtid_buf[128];
//uint64_t gtid_trxid;
int gtid_hid;

MySQL_STMTs_meta *sess_STMTs_meta;
StmtLongDataHandler *SLDH;

Expand Down
1 change: 1 addition & 0 deletions include/MySQL_Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ class MySQL_Threads_Handler
bool stats_time_query_processor;
bool query_cache_stores_empty_result;
bool kill_backend_connection_when_disconnect;
bool client_session_track_gtid;
} variables;
struct {
unsigned int mirror_sessions_current;
Expand Down
2 changes: 1 addition & 1 deletion include/mysql_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class MySQL_Backend
void * operator new(size_t);
void operator delete(void *);
int hostgroup_id;
char gtid_uuid[64];
char gtid_uuid[128];
uint64_t gtid_trxid;
MySQL_Data_Stream *server_myds;
// mysql_cp_entry_t *server_mycpe;
Expand Down
2 changes: 2 additions & 0 deletions include/mysql_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,7 @@ class MySQL_Connection {

bool get_gtid(char *buff, uint64_t *trx_id);
void reduce_auto_increment_delay_token() { if (auto_increment_delay_token) auto_increment_delay_token--; };

bool match_tracked_options(MySQL_Connection *c);
};
#endif /* __CLASS_MYSQL_CONNECTION_H */
2 changes: 2 additions & 0 deletions include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ __thread bool mysql_thread___sessions_sort;
__thread bool mysql_thread___session_idle_ms;
__thread int mysql_thread___hostgroup_manager_verbose;
__thread bool mysql_thread___kill_backend_connection_when_disconnect;
__thread bool mysql_thread___client_session_track_gtid;

/* variables used for Query Cache */
__thread int mysql_thread___query_cache_size_MB;
Expand Down Expand Up @@ -805,6 +806,7 @@ extern __thread bool mysql_thread___sessions_sort;
extern __thread bool mysql_thread___session_idle_ms;
extern __thread int mysql_thread___hostgroup_manager_verbose;
extern __thread bool mysql_thread___kill_backend_connection_when_disconnect;
extern __thread bool mysql_thread___client_session_track_gtid;

/* variables used for Query Cache */
extern __thread int mysql_thread___query_cache_size_MB;
Expand Down
47 changes: 36 additions & 11 deletions lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2614,37 +2614,60 @@ MySQL_Connection * MySrvConnList::get_random_MyConn(MySQL_Session *sess, bool ff
// try to match schemaname AND username
char *schema = sess->client_myds->myconn->userinfo->schemaname;
char *username = sess->client_myds->myconn->userinfo->username;
MySQL_Connection * client_conn = sess->client_myds->myconn;
bool conn_found = false;
unsigned int k;
unsigned int options_matching_idx;
bool options_matching_found = false;
for (k = i; conn_found == false && k < l; k++) {
conn = (MySQL_Connection *)conns->index(k);
if (strcmp(conn->userinfo->schemaname,schema)==0 && strcmp(conn->userinfo->username,username)==0) {
conn_found = true;
i = k;
if (conn->match_tracked_options(client_conn)) {
if (options_matching_found == false) {
options_matching_found = true;
options_matching_idx = k;
}
if (strcmp(conn->userinfo->schemaname,schema)==0 && strcmp(conn->userinfo->username,username)==0) {
conn_found = true;
i = k;
}
}
}
if (conn_found == false ) {
for (k = 0; conn_found == false && k < i; k++) {
conn = (MySQL_Connection *)conns->index(k);
if (strcmp(conn->userinfo->schemaname,schema)==0 && strcmp(conn->userinfo->username,username)==0) {
conn_found = true;
i = k;
if (conn->match_tracked_options(client_conn)) {
if (options_matching_found == false) {
options_matching_found = true;
options_matching_idx = k;
}
if (strcmp(conn->userinfo->schemaname,schema)==0 && strcmp(conn->userinfo->username,username)==0) {
conn_found = true;
i = k;
}
}
}
}
if (conn_found == true) {
conn=(MySQL_Connection *)conns->remove_index_fast(i);
} else {
// we may consider creating a new connection
unsigned int conns_free = mysrvc->ConnectionsFree->conns_length();
unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length();
if ((conns_used > conns_free) && (mysrvc->max_connections > (conns_free/2 + conns_used/2)) ) {
if (options_matching_found == false) {
// we must create a new connection
conn = new MySQL_Connection();
conn->parent=mysrvc;
__sync_fetch_and_add(&MyHGM->status.server_connections_created, 1);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
} else {
conn=(MySQL_Connection *)conns->remove_index_fast(i);
// we may consider creating a new connection
unsigned int conns_free = mysrvc->ConnectionsFree->conns_length();
unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length();
if ((conns_used > conns_free) && (mysrvc->max_connections > (conns_free/2 + conns_used/2)) ) {
conn = new MySQL_Connection();
conn->parent=mysrvc;
__sync_fetch_and_add(&MyHGM->status.server_connections_created, 1);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
} else {
conn=(MySQL_Connection *)conns->remove_index_fast(i);
}
}
}
} else {
Expand Down Expand Up @@ -3016,6 +3039,8 @@ SQLite3_result * MySQL_HostGroups_Manager::SQL3_Free_Connections() {
j["charset"] = _my->charset->nr;
j["options"]["charset_name"] = _my->options.charset_name;
j["options"]["use_ssl"] = _my->options.use_ssl;
j["client_flag"]["client_found_rows"] = (_my->client_flag & CLIENT_FOUND_ROWS);
j["client_flag"]["client_multi_statements"] = (_my->client_flag & CLIENT_MULTI_STATEMENTS);
j["net"]["last_errno"] = _my->net.last_errno;
j["net"]["fd"] = _my->net.fd;
j["net"]["max_packet_size"] = _my->net.max_packet_size;
Expand Down
49 changes: 45 additions & 4 deletions lib/MySQL_Protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,14 +609,40 @@ bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, u
char msg_prefix;
uint8_t msg_len_len=mysql_encode_length(msg_len, &msg_prefix);

bool client_session_track=false;
//char gtid_buf[128];
char gtid_prefix;
uint8_t gtid_len=0;
uint8_t gtid_len_len=0;

mysql_hdr myhdr;
myhdr.pkt_id=sequence_id;
myhdr.pkt_length=1+affected_rows_len+last_insert_id_len+sizeof(uint16_t)+sizeof(uint16_t)+msg_len;
if (msg_len) myhdr.pkt_length+=msg_len_len;
unsigned int size=myhdr.pkt_length+sizeof(mysql_hdr);
unsigned char *_ptr=(unsigned char *)l_alloc(size);
memcpy(_ptr, &myhdr, sizeof(mysql_hdr));
int l=sizeof(mysql_hdr);

if (*myds && (*myds)->myconn) {
if ((*myds)->myconn->options.client_flag & CLIENT_SESSION_TRACKING) {
if (mysql_thread___client_session_track_gtid) {
if (sess) {
if (sess->gtid_hid >= 0) {
myhdr.pkt_length++;
client_session_track=true;
gtid_len = strlen(sess->gtid_buf);
gtid_len_len = mysql_encode_length(gtid_len, &gtid_prefix);
myhdr.pkt_length += gtid_len_len;
myhdr.pkt_length += gtid_len;
myhdr.pkt_length += 4; // headers related to GTID
}
}
}
}
}


unsigned int size=myhdr.pkt_length+sizeof(mysql_hdr);
unsigned char *_ptr=(unsigned char *)l_alloc(size);
memcpy(_ptr, &myhdr, sizeof(mysql_hdr));
int l=sizeof(mysql_hdr);
_ptr[l]=0x00; l++;
l+=write_encoded_length(_ptr+l, affected_rows, affected_rows_len, affected_rows_prefix);
l+=write_encoded_length(_ptr+l, last_insert_id, last_insert_id_len, last_insert_id_prefix);
Expand Down Expand Up @@ -647,6 +673,21 @@ bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, u
l+=write_encoded_length(_ptr+l, msg_len, msg_len_len, msg_prefix);
memcpy(_ptr+l, msg, msg_len);
}
l+=msg_len;
if (client_session_track == true) {
_ptr[l]=0x00; l++;
if (gtid_len) {
unsigned char gtid_prefix_h1 = gtid_prefix+2;
unsigned char state_change_prefix = gtid_prefix_h1+2;
_ptr[l] = state_change_prefix; l++;
_ptr[l]=0x03; l++; // SESSION_TRACK_GTIDS
_ptr[l] = gtid_prefix_h1; l++;
_ptr[l]=0x00; l++;
// l+=write_encoded_length(_ptr+l, gtid_len, gtid_len_len, gtid_prefix); // overcomplicated
_ptr[l] = gtid_prefix; l++;
memcpy(_ptr+l, sess->gtid_buf, gtid_len);
}
}
if (send==true) {
(*myds)->PSarrayOUT->add((void *)_ptr,size);
switch ((*myds)->DSS) {
Expand Down
24 changes: 23 additions & 1 deletion lib/MySQL_Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ MySQL_Session::MySQL_Session() {
with_gtid = false;
use_ssl = false;

//gtid_trxid = 0;
gtid_hid = -1;
memset(gtid_buf,0,sizeof(gtid_buf));

match_regexes=NULL;
/*
match_regexes=(Session_Regex **)malloc(sizeof(Session_Regex *)*3);
Expand Down Expand Up @@ -464,6 +468,12 @@ void MySQL_Session::reset() {
mybes=NULL;
}
mybe=NULL;

with_gtid = false;

//gtid_trxid = 0;
gtid_hid = -1;
memset(gtid_buf,0,sizeof(gtid_buf));
}

MySQL_Session::~MySQL_Session() {
Expand Down Expand Up @@ -839,6 +849,8 @@ void MySQL_Session::generate_proxysql_internal_session_json(json &j) {
j["autocommit_on_hostgroup"] = autocommit_on_hostgroup;
j["last_insert_id"] = last_insert_id;
j["last_HG_affected_rows"] = last_HG_affected_rows;
j["gtid"]["hid"] = gtid_hid;
j["gtid"]["last"] = ( strlen(gtid_buf) ? gtid_buf : "" );
j["client"]["userinfo"]["username"] = ( client_myds->myconn->userinfo->username ? client_myds->myconn->userinfo->username : "" );
#ifdef DEBUG
j["client"]["userinfo"]["password"] = ( client_myds->myconn->userinfo->password ? client_myds->myconn->userinfo->password : "" );
Expand All @@ -860,6 +872,7 @@ void MySQL_Session::generate_proxysql_internal_session_json(json &j) {
j["conn"]["charset"] = client_myds->myconn->options.charset;
j["conn"]["sql_log_bin"] = client_myds->myconn->options.sql_log_bin;
j["conn"]["autocommit"] = client_myds->myconn->options.autocommit;
j["conn"]["client_flag"] = client_myds->myconn->options.client_flag;
j["conn"]["no_backslash_escapes"] = client_myds->myconn->options.no_backslash_escapes;
j["conn"]["status"]["compression"] = client_myds->myconn->get_status_compression();
j["conn"]["status"]["transaction"] = client_myds->myconn->get_status_transaction();
Expand All @@ -869,6 +882,7 @@ void MySQL_Session::generate_proxysql_internal_session_json(json &j) {
_mybe=(MySQL_Backend *)mybes->index(k);
unsigned int i = _mybe->hostgroup_id;
j["backends"][i]["hostgroup_id"] = i;
j["backends"][i]["gtid"] = ( strlen(_mybe->gtid_uuid) ? _mybe->gtid_uuid : "" );
if (_mybe->server_myds) {
MySQL_Data_Stream *_myds=_mybe->server_myds;
sprintf(buff,"%p",_myds);
Expand All @@ -887,6 +901,7 @@ void MySQL_Session::generate_proxysql_internal_session_json(json &j) {
MySQL_Connection * _myconn = _myds->myconn;
sprintf(buff,"%p",_myconn);
j["backends"][i]["conn"]["address"] = buff;
j["backends"][i]["conn"]["auto_increment_delay_token"] = _myconn->auto_increment_delay_token;
j["backends"][i]["conn"]["bytes_recv"] = _myconn->bytes_info.bytes_recv;
j["backends"][i]["conn"]["bytes_sent"] = _myconn->bytes_info.bytes_sent;
j["backends"][i]["conn"]["questions"] = _myconn->statuses.questions;
Expand Down Expand Up @@ -3350,8 +3365,15 @@ int MySQL_Session::handler() {
(endt.tv_sec*1000000000+endt.tv_nsec) -
(begint.tv_sec*1000000000+begint.tv_nsec);
}
gtid_hid = -1;
if (rc==0) {
myconn->get_gtid(mybe->gtid_uuid,&mybe->gtid_trxid);
if (myconn->get_gtid(mybe->gtid_uuid,&mybe->gtid_trxid)) {
if (mysql_thread___client_session_track_gtid) {
gtid_hid = current_hostgroup;
memcpy(gtid_buf,mybe->gtid_uuid,sizeof(gtid_buf));
}
}

// check if multiplexing needs to be disabled
char *qdt=CurrentQuery.get_digest_text();
if (qdt)
Expand Down
18 changes: 18 additions & 0 deletions lib/MySQL_Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"server_version",
(char *)"keep_multiplexing_variables",
(char *)"kill_backend_connection_when_disconnect",
(char *)"client_session_track_gtid",
(char *)"sessions_sort",
#ifdef IDLE_THREADS
(char *)"session_idle_show_processlist",
Expand Down Expand Up @@ -461,6 +462,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.stats_time_query_processor=false;
variables.query_cache_stores_empty_result=true;
variables.kill_backend_connection_when_disconnect=true;
variables.client_session_track_gtid=true;
variables.sessions_sort=true;
#ifdef IDLE_THREADS
variables.session_idle_ms=1000;
Expand Down Expand Up @@ -804,6 +806,7 @@ int MySQL_Threads_Handler::get_variable_int(const char *name) {
if (!strcmp(name,"stats_time_query_processor")) return (int)variables.stats_time_query_processor;
if (!strcmp(name,"query_cache_stores_empty_result")) return (int)variables.query_cache_stores_empty_result;
if (!strcmp(name,"kill_backend_connection_when_disconnect")) return (int)variables.kill_backend_connection_when_disconnect;
if (!strcmp(name,"client_session_track_gtid")) return (int)variables.client_session_track_gtid;
if (!strcmp(name,"sessions_sort")) return (int)variables.sessions_sort;
#ifdef IDLE_THREADS
if (!strcmp(name,"session_idle_show_processlist")) return (int)variables.session_idle_show_processlist;
Expand Down Expand Up @@ -1270,6 +1273,9 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f
if (!strcasecmp(name,"kill_backend_connection_when_disconnect")) {
return strdup((variables.kill_backend_connection_when_disconnect ? "true" : "false"));
}
if (!strcasecmp(name,"client_session_track_gtid")) {
return strdup((variables.client_session_track_gtid ? "true" : "false"));
}
if (!strcasecmp(name,"sessions_sort")) {
return strdup((variables.sessions_sort ? "true" : "false"));
}
Expand Down Expand Up @@ -2501,6 +2507,17 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t
}
return false;
}
if (!strcasecmp(name,"client_session_track_gtid")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.client_session_track_gtid=true;
return true;
}
if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
variables.client_session_track_gtid=false;
return true;
}
return false;
}
if (!strcasecmp(name,"servers_stats")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.servers_stats=true;
Expand Down Expand Up @@ -3934,6 +3951,7 @@ void MySQL_Thread::refresh_variables() {
variables.query_cache_stores_empty_result=(bool)GloMTH->get_variable_int((char *)"query_cache_stores_empty_result");
mysql_thread___hostgroup_manager_verbose = GloMTH->get_variable_int((char *)"hostgroup_manager_verbose");
mysql_thread___kill_backend_connection_when_disconnect=(bool)GloMTH->get_variable_int((char *)"kill_backend_connection_when_disconnect");
mysql_thread___client_session_track_gtid=(bool)GloMTH->get_variable_int((char *)"client_session_track_gtid");
mysql_thread___sessions_sort=(bool)GloMTH->get_variable_int((char *)"sessions_sort");
#ifdef IDLE_THREADS
mysql_thread___session_idle_show_processlist=(bool)GloMTH->get_variable_int((char *)"session_idle_show_processlist");
Expand Down
Loading

0 comments on commit 6691a86

Please sign in to comment.