Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced new HG attribute 'monitor_slave_lag_when_null' which takes precedence over 'mysql_thread_monitor_slave_lag_when_null' #4528

Merged
merged 6 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions include/MySQL_HostGroups_Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ class MyHGC { // MySQL Host Group Container
char * ignore_session_variables_text; // this is the original version (text format) of ignore_session_variables
uint32_t max_num_online_servers;
uint32_t throttle_connections_per_sec;
int32_t monitor_slave_lag_when_null;
int8_t autocommit;
int8_t free_connections_pct;
int8_t handle_warnings;
Expand All @@ -310,6 +311,10 @@ class MyHGC { // MySQL Host Group Container
bool handle_warnings_enabled() const {
return attributes.configured == true && attributes.handle_warnings != -1 ? attributes.handle_warnings : mysql_thread___handle_warnings;
}
inline
int32_t get_monitor_slave_lag_when_null() const {
return attributes.configured == true && attributes.monitor_slave_lag_when_null != -1 ? attributes.monitor_slave_lag_when_null : mysql_thread___monitor_slave_lag_when_null;
}
MyHGC(int);
~MyHGC();
MySrvC *get_random_MySrvC(char * gtid_uuid, uint64_t gtid_trxid, int max_lag_ms, MySQL_Session *sess);
Expand Down Expand Up @@ -536,9 +541,10 @@ using address_t = std::string;
using port_t = unsigned int;
using read_only_t = int;
using current_replication_lag = int;
using override_replication_lag = bool;

using read_only_server_t = std::tuple<hostname_t,port_t,read_only_t>;
using replication_lag_server_t = std::tuple<hostgroupid_t,address_t,port_t,current_replication_lag>;
using replication_lag_server_t = std::tuple<hostgroupid_t,address_t,port_t,current_replication_lag,override_replication_lag>;

enum READ_ONLY_SERVER_T {
ROS_HOSTNAME = 0,
Expand All @@ -552,6 +558,7 @@ enum REPLICATION_LAG_SERVER_T {
RLS_ADDRESS,
RLS_PORT,
RLS_CURRENT_REPLICATION_LAG,
RLS_OVERRIDE_REPLICATION_LAG,
RLS__SIZE
};

Expand Down Expand Up @@ -1085,7 +1092,7 @@ class MySQL_HostGroups_Manager {
void push_MyConn_to_pool_array(MySQL_Connection **, unsigned int);
void destroy_MyConn_from_pool(MySQL_Connection *, bool _lock=true);

void replication_lag_action_inner(MyHGC *, const char*, unsigned int, int);
void replication_lag_action_inner(MyHGC *, const char*, unsigned int, int, bool);
void replication_lag_action(const std::list<replication_lag_server_t>& mysql_servers);
void read_only_action(char *hostname, int port, int read_only);
void read_only_action_v2(const std::list<read_only_server_t>& mysql_servers);
Expand Down
4 changes: 2 additions & 2 deletions include/SQLite3_Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class SQLite3_Server {
std::vector<table_def_t *> *tables_defs_readonly;
#endif // TEST_READONLY
#ifdef TEST_REPLICATIONLAG
std::unordered_map<std::string, int> replicationlag_map;
std::unordered_map<std::string, std::unique_ptr<int>> replicationlag_map;
std::vector<table_def_t*>* tables_defs_replicationlag;
#endif // TEST_REPLICATIONLAG
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY) || defined(TEST_REPLICATIONLAG)
Expand Down Expand Up @@ -105,7 +105,7 @@ class SQLite3_Server {
#ifdef TEST_REPLICATIONLAG
pthread_mutex_t test_replicationlag_mutex;
void load_replicationlag_table(MySQL_Session* sess);
int replicationlag_test_value(const char* p);
int* replicationlag_test_value(const char* p);
int replicationlag_map_size() {
return replicationlag_map.size();
}
Expand Down
1 change: 1 addition & 0 deletions lib/MyHGC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ void MyHGC::reset_attributes() {
attributes.autocommit = -1;
attributes.free_connections_pct = 10;
attributes.handle_warnings = -1;
attributes.monitor_slave_lag_when_null = -1;
attributes.multiplex = true;
attributes.connection_warming = false;
free(attributes.init_connect);
Expand Down
34 changes: 24 additions & 10 deletions lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2685,9 +2685,16 @@ void MySQL_HostGroups_Manager::add(MySrvC *mysrvc, unsigned int _hid) {
myhgc->mysrvs->add(mysrvc);
}

void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const char *address, unsigned int port, int current_replication_lag) {
int j;
for (j=0; j<(int)myhgc->mysrvs->cnt(); j++) {
void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const char *address, unsigned int port,
int current_replication_lag, bool override_repl_lag) {

if (current_replication_lag == -1 && override_repl_lag == true) {
current_replication_lag = myhgc->get_monitor_slave_lag_when_null();
override_repl_lag = false;
proxy_error("Replication lag on server %s:%d is NULL, using value %d\n", address, port, current_replication_lag);
}

for (int j=0; j<(int)myhgc->mysrvs->cnt(); j++) {
MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j);
if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) {
mysrvc->cur_replication_lag = current_replication_lag;
Expand All @@ -2696,9 +2703,9 @@ void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const
// (current_replication_lag==-1 )
// ||
(
current_replication_lag>=0 &&
current_replication_lag >= 0 &&
mysrvc->max_replication_lag > 0 && // see issue #4018
((unsigned int)current_replication_lag > mysrvc->max_replication_lag)
(current_replication_lag > (int)mysrvc->max_replication_lag)
)
) {
// always increase the counter
Expand All @@ -2723,9 +2730,10 @@ void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const
} else {
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) {
if (
(current_replication_lag>=0 && ((unsigned int)current_replication_lag <= mysrvc->max_replication_lag))
(/*current_replication_lag >= 0 &&*/override_repl_lag == false &&
(current_replication_lag <= (int)mysrvc->max_replication_lag))
||
(current_replication_lag==-2) // see issue 959
(current_replication_lag==-2 && override_repl_lag == true) // see issue 959
) {
mysrvc->status=MYSQL_SERVER_STATUS_ONLINE;
proxy_warning("Re-enabling server %s:%d from HG %u with replication lag of %d second\n", address, port, myhgc->hid, current_replication_lag);
Expand All @@ -2751,18 +2759,19 @@ void MySQL_HostGroups_Manager::replication_lag_action(const std::list<replicatio
const std::string& address = std::get<REPLICATION_LAG_SERVER_T::RLS_ADDRESS>(server);
const unsigned int port = std::get<REPLICATION_LAG_SERVER_T::RLS_PORT>(server);
const int current_replication_lag = std::get<REPLICATION_LAG_SERVER_T::RLS_CURRENT_REPLICATION_LAG>(server);
const bool override_repl_lag = std::get<REPLICATION_LAG_SERVER_T::RLS_OVERRIDE_REPLICATION_LAG>(server);

if (mysql_thread___monitor_replication_lag_group_by_host == false) {
// legacy check. 1 check per server per hostgroup
MyHGC *myhgc = MyHGC_find(hid);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag,override_repl_lag);
}
else {
// only 1 check per server, no matter the hostgroup
// all hostgroups must be searched
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC*myhgc=(MyHGC*)MyHostGroups->index(i);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag,override_repl_lag);
}
}
}
Expand Down Expand Up @@ -6219,8 +6228,13 @@ void init_myhgc_hostgroup_settings(const char* hostgroup_settings, MyHGC* myhgc)
nlohmann::json j = nlohmann::json::parse(hostgroup_settings);

const auto handle_warnings_check = [](int8_t handle_warnings) -> bool { return handle_warnings == 0 || handle_warnings == 1; };
int8_t handle_warnings = j_get_srv_default_int_val<int8_t>(j, hid, "handle_warnings", handle_warnings_check);
const int8_t handle_warnings = j_get_srv_default_int_val<int8_t>(j, hid, "handle_warnings", handle_warnings_check);
myhgc->attributes.handle_warnings = handle_warnings;

const auto monitor_slave_lag_when_null_check = [](int32_t monitor_slave_lag_when_null) -> bool
{ return (monitor_slave_lag_when_null >= 0 && monitor_slave_lag_when_null <= 604800); };
const int32_t monitor_slave_lag_when_null = j_get_srv_default_int_val<int32_t>(j, hid, "monitor_slave_lag_when_null", monitor_slave_lag_when_null_check);
myhgc->attributes.monitor_slave_lag_when_null = monitor_slave_lag_when_null;
}
catch (const json::exception& e) {
proxy_error(
Expand Down
21 changes: 11 additions & 10 deletions lib/MySQL_Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2756,6 +2756,7 @@ void * monitor_replication_lag_thread(void *arg) {
ASSERT_SQLITE_OK(rc, mmsd->mondb);
// 'replication_lag' to be feed to 'replication_lag_action'
int repl_lag=-2;
bool override_repl_lag = true;
rc=(*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc=(*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb);
unsigned long long time_now=realtime_time();
Expand Down Expand Up @@ -2792,16 +2793,16 @@ void * monitor_replication_lag_thread(void *arg) {
MYSQL_ROW row=mysql_fetch_row(mmsd->result);
if (row) {
repl_lag=-1; // this is old behavior
repl_lag=mysql_thread___monitor_slave_lag_when_null; // new behavior, see 669
override_repl_lag = true;
if (row[j]) { // if Seconds_Behind_Master is not NULL
repl_lag=atoi(row[j]);
override_repl_lag = false;
} else {
proxy_error("Replication lag on server %s:%d is NULL, using the value %d (mysql-monitor_slave_lag_when_null)\n", mmsd->hostname, mmsd->port, mysql_thread___monitor_slave_lag_when_null);
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG);
}
}
}
if (repl_lag>=0) {
if (/*repl_lag >= 0 ||*/ override_repl_lag == false) {
rc=(*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else {
rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
Expand All @@ -2822,7 +2823,7 @@ void * monitor_replication_lag_thread(void *arg) {
rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
MyHGM->replication_lag_action( std::list<replication_lag_server_t> {
replication_lag_server_t {mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag}
replication_lag_server_t {mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag }
} );
(*proxy_sqlite3_finalize)(statement);
if (mmsd->mysql_error_msg == NULL) {
Expand Down Expand Up @@ -7741,8 +7742,7 @@ void MySQL_Monitor::monitor_gr_async_actions_handler(


bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vector<MySQL_Monitor_State_Data*>& mmsds) {

std::list<std::tuple<int, std::string, unsigned int, int>> mysql_servers;
std::list<replication_lag_server_t> mysql_servers;

for (auto& mmsd : mmsds) {

Expand Down Expand Up @@ -7784,6 +7784,7 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
ASSERT_SQLITE_OK(rc, mmsd->mondb);
// 'replication_lag' to be feed to 'replication_lag_action'
int repl_lag = -2;
bool override_repl_lag = true;
rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb);
unsigned long long time_now = realtime_time();
Expand Down Expand Up @@ -7820,16 +7821,16 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
if (row) {
repl_lag = -1; // this is old behavior
repl_lag = mysql_thread___monitor_slave_lag_when_null; // new behavior, see 669
override_repl_lag = true;
if (row[j]) { // if Seconds_Behind_Master is not NULL
repl_lag = atoi(row[j]);
override_repl_lag = false;
} else {
proxy_error("Replication lag on server %s:%d is NULL, using the value %d (mysql-monitor_slave_lag_when_null)\n", mmsd->hostname, mmsd->port, mysql_thread___monitor_slave_lag_when_null);
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG);
}
}
}
if (repl_lag >= 0) {
if (/*repl_lag >= 0 ||*/ override_repl_lag == false) {
rc = (*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else {
rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
Expand All @@ -7851,7 +7852,7 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
//MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag);
(*proxy_sqlite3_finalize)(statement);
mysql_servers.push_back( std::tuple<int,std::string,int,int> { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag });
mysql_servers.push_back( replication_lag_server_t { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag });
}

//executing replication lag action
Expand Down
14 changes: 0 additions & 14 deletions lib/ProxySQL_Cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,13 +562,6 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
checksums_values.mysql_servers.checksum, GloVars.checksums_values.mysql_servers.checksum, checksums_values.mysql_servers.diff_check);
}
if (strcmp(checksums_values.mysql_servers.checksum, GloVars.checksums_values.mysql_servers.checksum) == 0) {
// See LOGGING-NOTE at 'admin_variables' above.
if (checksums_values.mysql_servers.last_changed == now) {
proxy_info(
"Cluster: checksum for mysql_servers from peer %s:%d matches with local checksum %s , we won't sync.\n",
hostname, port, GloVars.checksums_values.mysql_servers.checksum
);
}
checksums_values.mysql_servers.diff_check = 0;
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Checksum for mysql_servers from peer %s:%d matches with local checksum %s, reset diff_check to 0.\n", hostname, port, GloVars.checksums_values.mysql_servers.checksum);
}
Expand Down Expand Up @@ -609,13 +602,6 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
checksums_values.mysql_servers_v2.checksum, GloVars.checksums_values.mysql_servers_v2.checksum, checksums_values.mysql_servers_v2.diff_check);
}
if (strcmp(checksums_values.mysql_servers_v2.checksum, GloVars.checksums_values.mysql_servers_v2.checksum) == 0) {
// See LOGGING-NOTE at 'admin_variables' above.
if (checksums_values.mysql_servers_v2.last_changed == now) {
proxy_info(
"Cluster: checksum for mysql_servers_v2 from peer %s:%d matches with local checksum %s , we won't sync.\n",
hostname, port, GloVars.checksums_values.mysql_servers_v2.checksum
);
}
checksums_values.mysql_servers_v2.diff_check = 0;
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Checksum for mysql_servers_v2 from peer %s:%d matches with local checksum %s, reset diff_check to 0.\n", hostname, port, GloVars.checksums_values.mysql_servers.checksum);
}
Expand Down
33 changes: 22 additions & 11 deletions src/SQLite3_Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,11 +879,17 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p
// probably never initialized
GloSQLite3Server->load_replicationlag_table(sess);
}
const int rc = GloSQLite3Server->replicationlag_test_value(query_no_space + strlen("SELECT SLAVE STATUS "));
const int* rc = GloSQLite3Server->replicationlag_test_value(query_no_space + strlen("SELECT SLAVE STATUS "));
free(query);
char* a = (char*)"SELECT %d as Seconds_Behind_Master";
query = (char*)malloc(strlen(a) + 2);
sprintf(query, a, rc);
if (rc == nullptr) {
const char* a = (char*)"SELECT null as Seconds_Behind_Master";
query = (char*)malloc(strlen(a) + 2);
sprintf(query, a);
} else {
const char* a = (char*)"SELECT %d as Seconds_Behind_Master";
query = (char*)malloc(strlen(a) + 2);
sprintf(query, a, *rc);
}
pthread_mutex_unlock(&GloSQLite3Server->test_replicationlag_mutex);
}
}
Expand Down Expand Up @@ -1845,7 +1851,7 @@ bool SQLite3_Server::init() {
insert_into_tables_defs(tables_defs_replicationlag,
(const char*)"REPLICATIONLAG_HOST_STATUS",
(const char*)"CREATE TABLE REPLICATIONLAG_HOST_STATUS ("
"hostname VARCHAR NOT NULL, port INT NOT NULL, seconds_behind_master INT NOT NULL, PRIMARY KEY (hostname, port)"
"hostname VARCHAR NOT NULL, port INT NOT NULL, seconds_behind_master INT DEFAULT NULL, PRIMARY KEY (hostname, port)"
")"
);

Expand Down Expand Up @@ -2016,27 +2022,32 @@ void SQLite3_Server::load_replicationlag_table(MySQL_Session* sess) {
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
const std::string& s = std::string(r->fields[0]) + ":" + std::string(r->fields[1]);
replicationlag_map[s] = atoi(r->fields[2]);

if (r->fields[2] == nullptr) {
replicationlag_map[s] = nullptr;
} else {
replicationlag_map[s] = std::make_unique<int>(atoi(r->fields[2]));
}
}
}
delete resultset;
if (replicationlag_map.size() == 0) {
GloAdmin->admindb->execute_statement((char*)"SELECT DISTINCT hostname, port FROM mysql_servers WHERE hostgroup_id BETWEEN 5202 AND 5700", &error, &cols, &affected_rows, &resultset);
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
const std::string& s = "INSERT INTO REPLICATIONLAG_HOST_STATUS VALUES ('" + std::string(r->fields[0]) + "'," + std::string(r->fields[1]) + ",0)";
const std::string& s = "INSERT INTO REPLICATIONLAG_HOST_STATUS VALUES ('" + std::string(r->fields[0]) + "'," + std::string(r->fields[1]) + ",null)";
sessdb->execute(s.c_str());
}
delete resultset;
}
GloAdmin->mysql_servers_wrunlock();
}

int SQLite3_Server::replicationlag_test_value(const char* p) {
int rc = 0; // default
std::unordered_map<std::string, int>::iterator it = replicationlag_map.find(std::string(p));
int* SQLite3_Server::replicationlag_test_value(const char* p) {
int* rc = 0; // default
std::unordered_map<std::string, std::unique_ptr<int>>::iterator it = replicationlag_map.find(std::string(p));
if (it != replicationlag_map.end()) {
rc = it->second;
rc = it->second.get();
}
return rc;
}
Expand Down
Loading