Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AWS RDS MySQL Multi-AZ Cluster auto-discovery #4406

Merged
merged 6 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/MySQL_HostGroups_Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,8 @@ class MySQL_HostGroups_Manager {
void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us);
unsigned long long Get_Memory_Stats();

void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>>& new_servers);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous comment not addressed, should be const.


void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup);
Expand Down
10 changes: 8 additions & 2 deletions include/MySQL_Monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ struct cmp_str {

#define N_L_ASE 16

#define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com"
#define QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY "SELECT @@global.read_only read_only, id, endpoint, port from mysql.rds_topology"

/*

Implementation of monitoring in AWS Aurora will be different than previous modules
Expand Down Expand Up @@ -197,7 +200,8 @@ enum MySQL_Monitor_State_Data_Task_Type {
MON_GROUP_REPLICATION,
MON_REPLICATION_LAG,
MON_GALERA,
MON_AWS_AURORA
MON_AWS_AURORA,
MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY
};

enum class MySQL_Monitor_State_Data_Task_Result {
Expand Down Expand Up @@ -229,6 +233,7 @@ class MySQL_Monitor_State_Data {
char *hostname;
int port;
int writer_hostgroup; // used only by group replication
int reader_hostgroup;
bool writer_is_also_reader; // used only by group replication
int max_transactions_behind; // used only by group replication
int max_transactions_behind_count; // used only by group replication
Expand Down Expand Up @@ -442,6 +447,7 @@ class MySQL_Monitor {
static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql);
static void trigger_dns_cache_update();

void process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, int reader_hostgroup);

private:
std::vector<table_def_t *> *tables_defs_monitor;
Expand Down Expand Up @@ -553,7 +559,7 @@ class MySQL_Monitor {
* Note: Calling init_async is mandatory before executing tasks asynchronously.
*/
void monitor_ping_async(SQLite3_result* resultset);
void monitor_read_only_async(SQLite3_result* resultset);
void monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check);
void monitor_replication_lag_async(SQLite3_result* resultset);
void monitor_group_replication_async();
void monitor_galera_async();
Expand Down
3 changes: 3 additions & 0 deletions include/MySQL_Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ struct p_th_gauge {
mysql_monitor_ping_interval,
mysql_monitor_ping_timeout,
mysql_monitor_ping_max_failures,
mysql_monitor_aws_rds_topology_discovery_interval,
mysql_monitor_read_only_interval,
mysql_monitor_read_only_timeout,
mysql_monitor_writer_is_also_reader,
Expand Down Expand Up @@ -385,6 +386,8 @@ class MySQL_Threads_Handler
int monitor_ping_max_failures;
//! Monitor ping timeout. Unit: 'ms'.
int monitor_ping_timeout;
//! Monitor aws rds topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'.
int monitor_aws_rds_topology_discovery_interval;
//! Monitor read only timeout. Unit: 'ms'.
int monitor_read_only_interval;
//! Monitor read only timeout. Unit: 'ms'.
Expand Down
2 changes: 2 additions & 0 deletions include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,7 @@ __thread int mysql_thread___monitor_connect_timeout;
__thread int mysql_thread___monitor_ping_interval;
__thread int mysql_thread___monitor_ping_max_failures;
__thread int mysql_thread___monitor_ping_timeout;
__thread int mysql_thread___monitor_aws_rds_topology_discovery_interval;
__thread int mysql_thread___monitor_read_only_interval;
__thread int mysql_thread___monitor_read_only_timeout;
__thread int mysql_thread___monitor_read_only_max_timeout_count;
Expand Down Expand Up @@ -1073,6 +1074,7 @@ extern __thread int mysql_thread___monitor_connect_timeout;
extern __thread int mysql_thread___monitor_ping_interval;
extern __thread int mysql_thread___monitor_ping_max_failures;
extern __thread int mysql_thread___monitor_ping_timeout;
extern __thread int mysql_thread___monitor_aws_rds_topology_discovery_interval;
extern __thread int mysql_thread___monitor_read_only_interval;
extern __thread int mysql_thread___monitor_read_only_timeout;
extern __thread int mysql_thread___monitor_read_only_max_timeout_count;
Expand Down
73 changes: 73 additions & 0 deletions lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2200,6 +2200,7 @@ bool MySQL_HostGroups_Manager::commit(
// fill Hostgroup_Manager_Mapping with latest records
update_hostgroup_manager_mappings();


ev_async_send(gtid_ev_loop, gtid_ev_async);

__sync_fetch_and_add(&status.servers_table_version,1);
Expand Down Expand Up @@ -8270,3 +8271,75 @@ MySQLServers_SslParams * MySQL_HostGroups_Manager::get_Server_SSL_Params(char *h
}
return NULL;
}

/**
* @brief Updates replication hostgroups by adding autodiscovered mysql servers.
* @details Adds each server from 'new_servers' to the 'runtime_mysql_servers' table.
* We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'.
* @param new_servers A vector of tuples where each tuple contains the values needed to add each new server.
*/
void MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>>& new_servers) {
int added_new_server;
Copy link
Collaborator

@JavierJF JavierJF Apr 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uninitialized variable, fine right now if the function isn't called with an empty list, could misbehave otherwise, better to initialize with -1.

wrlock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing lock on Admin GloAdmin->mysql_servers_wrlock(), since get_admin_runtime_mysql_servers is used, Admin should be locked, in other places were both locks needs to be taken, we take them together for simplicity. See MySQL_HostGroups_Manager::update_aws_aurora_set_reader.


// Add the discovered server with default values
for (tuple<string, int, int> s : new_servers) {
string host = std::get<0>(s);
uint16_t port = std::get<1>(s);
long int hostgroup_id = std::get<2>(s);

srv_info_t srv_info { host.c_str(), port, "AWS RDS" };
srv_opts_t srv_opts { -1, -1, -1 };

added_new_server = create_new_server_in_hg(hostgroup_id, srv_info, srv_opts);
}

// If servers were added, perform necessary updates to internal structures
if (added_new_server > -1) {
purge_mysql_servers_table();
mydb->execute("DELETE FROM mysql_servers");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
generate_mysql_servers_table();

// Update the global checksums after 'mysql_servers' regeneration
{
unique_ptr<SQLite3_result> resultset { get_admin_runtime_mysql_servers(mydb) };
string mysrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) };
save_runtime_mysql_servers(resultset.release());

// Update the runtime_mysql_servers checksum with the new checksum
uint64_t raw_checksum = this->runtime_mysql_servers ? this->runtime_mysql_servers->raw_checksum() : 0;
table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = raw_checksum;

// This is required for preserving coherence in the checksums, otherwise they would be inconsistent with `commit` generated checksums
SpookyHash rep_hgs_hash {};
bool init = false;
uint64_t servers_v2_hash = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS_V2];

if (servers_v2_hash) {
if (init == false) {
init = true;
rep_hgs_hash.Init(19, 3);
}

rep_hgs_hash.Update(&servers_v2_hash, sizeof(servers_v2_hash));
}

CUCFT1(
rep_hgs_hash, init, "mysql_replication_hostgroups", "writer_hostgroup",
table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]
);

proxy_info("Checksum for table %s is %s\n", "mysql_servers", mysrvs_checksum.c_str());

pthread_mutex_lock(&GloVars.checksum_mutex);
update_glovars_mysql_servers_checksum(mysrvs_checksum);
pthread_mutex_unlock(&GloVars.checksum_mutex);
}

update_table_mysql_servers_for_monitor(false);
update_hostgroup_manager_mappings();
}

wrunlock();
}
124 changes: 120 additions & 4 deletions lib/MySQL_Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,12 @@ void MySQL_Monitor_State_Data::init_async() {
task_timeout_ = mysql_thread___monitor_read_only_timeout;
task_handler_ = &MySQL_Monitor_State_Data::read_only_handler;
break;
case MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY:
query_ = QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY;
async_state_machine_ = ASYNC_QUERY_START;
task_timeout_ = mysql_thread___monitor_read_only_timeout;
task_handler_ = &MySQL_Monitor_State_Data::read_only_handler;
break;
#else // TEST_READONLY
case MON_READ_ONLY:
case MON_INNODB_READ_ONLY:
Expand Down Expand Up @@ -1621,6 +1627,8 @@ void * monitor_read_only_thread(void *arg) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only&@@global.innodb_read_only read_only");
} else if (mmsd->get_task_type() == MON_READ_ONLY__OR__INNODB_READ_ONLY) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only|@@global.innodb_read_only read_only");
} else if (mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY);
} else { // default
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only read_only");
}
Expand Down Expand Up @@ -3280,6 +3288,65 @@ VALGRIND_ENABLE_ERROR_REPORTING;
return ret;
}

/**
* @brief Processes the discovered servers to eventually add them to 'runtime_mysql_servers'.
* @details This method takes a vector of discovered servers, compares them against the existing servers, and adds the new servers to 'runtime_mysql_servers'.
* @param originating_server_hostname A string which denotes the hostname of the originating server, from which the discovered servers were queried and found.
* @param discovered_servers A vector of servers discovered when querying the cluster's topology.
* @param reader_hostgroup Reader hostgroup to which we will add the discovered servers.
*/
void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, int reader_hostgroup) {
char *error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result *runtime_mysql_servers = NULL;

char *query=(char *)"SELECT DISTINCT hostname FROM monitor_internal.mysql_servers ORDER BY hostname";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
monitordb->execute_statement(query, &error, &cols, &affected_rows, &runtime_mysql_servers);

if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
vector<tuple<string, int, int>> new_servers;
vector<string> saved_hostnames;
saved_hostnames.push_back(originating_server_hostname);

// Do an initial loop through the query results to save existing runtime server hostnames
for (std::vector<SQLite3_row *>::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) {
SQLite3_row *r1 = *it;
string current_runtime_hostname = r1->fields[0];

saved_hostnames.push_back(current_runtime_hostname);
}

// Loop through discovered servers and process the ones we haven't saved yet
for (MYSQL_ROW s : discovered_servers) {
string current_discovered_hostname = s[2];
string current_discovered_port_string = s[3];
int current_discovered_port_int;

try {
current_discovered_port_int = stoi(s[3]);
} catch (...) {
proxy_error("Unable to parse the port value during topology discovery: [%s]. Terminating discovery early.", current_discovered_port_string);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing newline character \n in error message. Also, C string not being passed (char*) instead, a std::string is feed, this is undefined behavior at best, potential crash at worst.

return;
}

if (find(saved_hostnames.begin(), saved_hostnames.end(), current_discovered_hostname) == saved_hostnames.end()) {
tuple<string, int, int> new_server(current_discovered_hostname, current_discovered_port_int, reader_hostgroup);
new_servers.push_back(new_server);
saved_hostnames.push_back(current_discovered_hostname);
}
}

// Add the new servers if any
if (!new_servers.empty()) {
MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers);
}
}
}

void * MySQL_Monitor::monitor_read_only() {
mysql_close(mysql_init(NULL));
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
Expand All @@ -3293,14 +3360,17 @@ void * MySQL_Monitor::monitor_read_only() {
unsigned long long t1;
unsigned long long t2;
unsigned long long next_loop_at=0;
int topology_loop = 0;
int topology_loop_max = mysql_thread___monitor_aws_rds_topology_discovery_interval;

while (GloMyMon->shutdown==false && mysql_thread___monitor_enabled==true) {
bool do_discovery_check = false;

unsigned int glover;
char *error=NULL;
SQLite3_result *resultset=NULL;
// add support for SSL
char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()";
char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type, reader_hostgroup FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()";
t1=monotonic_time();

if (!GloMTH) return NULL; // quick exit during shutdown/restart
Expand All @@ -3311,6 +3381,7 @@ void * MySQL_Monitor::monitor_read_only() {
next_loop_at=0;
}


if (t1 < next_loop_at) {
goto __sleep_monitor_read_only;
}
Expand All @@ -3327,8 +3398,14 @@ void * MySQL_Monitor::monitor_read_only() {
goto __end_monitor_read_only_loop;
}

if (topology_loop >= topology_loop_max) {
do_discovery_check = true;
topology_loop = 0;
}
topology_loop += 1;

// resultset must be initialized before calling monitor_read_only_async
monitor_read_only_async(resultset);
monitor_read_only_async(resultset, do_discovery_check);
if (shutdown) return NULL;

__end_monitor_read_only_loop:
Expand Down Expand Up @@ -7197,7 +7274,7 @@ bool MySQL_Monitor::monitor_read_only_process_ready_tasks(const std::vector<MySQ
std::list<read_only_server_t> mysql_servers;

for (auto& mmsd : mmsds) {

string originating_server_hostname = mmsd->hostname;
const auto task_result = mmsd->get_task_result();

assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING);
Expand Down Expand Up @@ -7267,6 +7344,38 @@ VALGRIND_ENABLE_ERROR_REPORTING;
}

rc = (*proxy_sqlite3_bind_int64)(statement, 5, read_only); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else if (fields && mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) {
// Process the read_only field as above and store the first server
vector<MYSQL_ROW> discovered_servers;
for (k = 0; k < num_fields; k++) {
if (strcmp((char*)"read_only", (char*)fields[k].name) == 0) {
j = k;
}
}
if (j > -1) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
if (row) {
discovered_servers.push_back(row);
VALGRIND_DISABLE_ERROR_REPORTING;
if (row[j]) {
if (!strcmp(row[j], "0") || !strcasecmp(row[j], "OFF"))
read_only = 0;
}
VALGRIND_ENABLE_ERROR_REPORTING;
}
}

// Store the remaining servers
int num_rows = mysql_num_rows(mmsd->result);
for (int i = 1; i < num_rows; i++) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
discovered_servers.push_back(row);
}

// Process the discovered servers and add them to 'runtime_mysql_servers'
if (!discovered_servers.empty()) {
process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup);
}
} else {
proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port);
rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
Expand Down Expand Up @@ -7327,7 +7436,7 @@ VALGRIND_ENABLE_ERROR_REPORTING;
return true;
}

void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) {
void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check) {
assert(resultset);

std::vector<std::unique_ptr<MySQL_Monitor_State_Data>> mmsds;
Expand All @@ -7350,11 +7459,18 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) {
} else if (strcasecmp(r->fields[3], (char*)"read_only|innodb_read_only") == 0) {
task_type = MON_READ_ONLY__OR__INNODB_READ_ONLY;
}

// Change task type if it's time to do discovery check. Only for aws rds endpoints
string hostname = r->fields[0];
if (do_discovery_check && hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) {
task_type = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY;
}
}

std::unique_ptr<MySQL_Monitor_State_Data> mmsd(
new MySQL_Monitor_State_Data(task_type, r->fields[0], atoi(r->fields[1]), atoi(r->fields[2])));

mmsd->reader_hostgroup = atoi(r->fields[4]); // set reader_hostgroup
mmsd->mondb = monitordb;
mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get());

Expand Down
Loading