Skip to content

Commit

Permalink
replicators: report mysql position to keep distance short
Browse files Browse the repository at this point in the history
The MySQL min and max positions can be far apart. This happens on
unbalance workloads, where for example one of the tables do not
receive updates for a long period of time, like a config table that
might be static. This causes the MySQL min position to fall behind
the max position, in case of a restart, the replicator has to catch
up starting from the min position. This causes a lot on unnecessary
data to be re-streamed.
Currently we only update the min position when MySQL rotates the
binary log and we receive a EventType::ROTATE_EVENT.
Adjusting the position on all tables might be costly if the
installation has a lot of tables.

This commit adjust the replicator to adjust table position on a fixed
interval. This interval is hardcoded to 60 seconds.
The event we act on is either the EventType::QueryEvent when we
receive a "COMMIT" query, or the EventType::XidEvent. They are
virtually the same thing (a commit), but depending on the storage
engine MySQL reports a query COMMIT or an XID event.
We also report the position once we have finished the initial catch up
phase.

Ref: REA-4326
Closes: #1223

Change-Id: I6dfaf523b8851597a6a0fd97f4d4627ca2f4ea80
  • Loading branch information
altmannmarcelo committed Apr 24, 2024
1 parent 3cb61fe commit dd425fe
Showing 1 changed file with 98 additions and 20 deletions.
118 changes: 98 additions & 20 deletions replicators/src/mysql_connector/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use mysql_async as mysql;
use mysql_common::binlog;
use mysql_common::binlog::row::BinlogRow;
use mysql_common::binlog::value::BinlogValue;
use nom_sql::Relation;
use nom_sql::{Relation, SqlIdentifier};
use readyset_client::metrics::recorded;
use readyset_client::recipe::ChangeList;
use readyset_data::{DfValue, Dialect};
Expand All @@ -25,6 +25,7 @@ use crate::noria_adapter::{Connector, ReplicationAction};

const CHECKSUM_QUERY: &str = "SET @master_binlog_checksum='CRC32'";
const DEFAULT_SERVER_ID: u32 = u32::MAX - 55;
const MAX_POSITION_TIME: u64 = 60;

/// A connector that connects to a MySQL server and starts reading binlogs from a given position.
///
Expand Down Expand Up @@ -55,6 +56,9 @@ pub(crate) struct MySqlBinlogConnector {
current_gtid: Option<u64>,
/// Whether to log statements received by the connector
enable_statement_logging: bool,
/// Timestamp of the last reported position. This is use to ensure we keep the distance
/// between min/max position as short as possible.
last_reported_pos_ts: std::time::Instant,
}

impl MySqlBinlogConnector {
Expand Down Expand Up @@ -131,6 +135,8 @@ impl MySqlBinlogConnector {
next_position,
current_gtid: None,
enable_statement_logging,
last_reported_pos_ts: std::time::Instant::now()
- std::time::Duration::from_secs(MAX_POSITION_TIME),
};

connector.register_as_replica().await?;
Expand Down Expand Up @@ -376,14 +382,16 @@ impl MySqlBinlogConnector {
///
/// # Arguments
///
/// * `q_event` - the query event to process
/// * `q_event` - the query event to process.
/// * `is_last` - a boolean indicating if this is the last event during catchup.
///
/// # Returns
/// This function might return an error of type `ReadySetError::SkipEvent` if the query does not
/// affect the schema. This event should be skipped in the parent function.
async fn process_event_query(
&mut self,
q_event: mysql_common::binlog::events::QueryEvent<'_>,
is_last: bool,
) -> mysql::Result<ReplicationAction> {
// Written when an updating statement is done.
if self.enable_statement_logging {
Expand All @@ -405,11 +413,22 @@ impl MySqlBinlogConnector {
}
// If the query does not affect the schema, just keep going
// TODO: Transactions begin with the `BEGIN` queries, but we do not
// currently support those
// currently support those.
// `COMMIT` queries are issued for writes on non-transactional storage engines
// such as MyISAM. We report the position after the `COMMIT` query if necessary.
_ => {
return Err(mysql_async::Error::Other(Box::new(
ReadySetError::SkipEvent,
)))
match q_event.query().eq("COMMIT")
&& (self.report_position_elapsed().await || is_last)
{
true => {
return Ok(ReplicationAction::LogPosition);
}
false => {
return Err(mysql_async::Error::Other(Box::new(
ReadySetError::SkipEvent,
)))
}
}
}
};

Expand All @@ -430,9 +449,11 @@ impl MySqlBinlogConnector {
/// Merge table actions into a hashmap of actions.
/// If the table already exists in the hashmap, the actions are merged.
/// If the table does not exist in the hashmap, a new entry is created.
///
/// # Arguments
/// * `map` - the hashmap to merge the actions into
/// * `action` - the action to merge
///
/// # Returns
/// This function does not return anything, it modifies the hashmap in place.
async fn merge_table_actions(
Expand Down Expand Up @@ -471,11 +492,14 @@ impl MySqlBinlogConnector {
/// # Arguments
///
/// * `payload_event` - the payload event to process
/// * `is_last` - a boolean indicating if this is the last event during catchup.
///
/// # Returns
/// This function returns a vector of all actionable inner events
async fn process_event_transaction_payload(
&mut self,
payload_event: mysql_common::binlog::events::TransactionPayloadEvent<'_>,
is_last: bool,
) -> mysql::Result<Vec<ReplicationAction>> {
let mut hash_actions: HashMap<Relation, ReplicationAction> = HashMap::new();
if self.enable_statement_logging {
Expand All @@ -497,20 +521,33 @@ impl MySqlBinlogConnector {
})? {
EventType::QUERY_EVENT => {
// We only accept query events in the transaction payload that do not affect the
// schema. Those are BEGIN and COMMIT and they emit a
// `ReadySetError::SkipEvent`.
let _ = match self.process_event_query(binlog_ev.read_event()?).await {
// schema. Those are `BEGIN` and `COMMIT`. `BEGIN` will return a
// `ReadySetError::SkipEvent` and `COMMIT` will return a
// `ReplicationAction::LogPosition` if necessary. We skip
// `ReplicationAction::LogPosition` here because we will report the position
// only once at the end.
match self
.process_event_query(binlog_ev.read_event()?, is_last)
.await
{
Err(mysql_async::Error::Other(ref err))
if err.downcast_ref::<ReadySetError>()
== Some(&ReadySetError::SkipEvent) =>
{
continue;
}
Err(err) => return Err(err),
Ok(action) => mysql_async::Error::Other(Box::new(internal_err!(
"Unexpected query event in transaction payload {:?}",
action
))),
Ok(action) => match action {
ReplicationAction::LogPosition { .. } => {
continue;
}
_ => {
return Err(mysql_async::Error::Other(Box::new(internal_err!(
"Unexpected query event in transaction payload {:?}",
action
))));
}
},
};
}
EventType::WRITE_ROWS_EVENT => {
Expand Down Expand Up @@ -543,9 +580,34 @@ impl MySqlBinlogConnector {
}
}
}
// We will always have received at least one COMMIT from either COM_QUERY or XID_EVENT.
// To avoid reporting multiple times the same position we only report it once here if
// necessary.
if !hash_actions.is_empty() && (self.report_position_elapsed().await || is_last) {
hash_actions.insert(
Relation {
schema: None,
name: SqlIdentifier::from(""),
},
ReplicationAction::LogPosition,
);
}
Ok(hash_actions.into_values().collect())
}

/// Check whatever we need to report the current position
/// If last_reported_pos_ts has elapsed, update it with the current timestamp.
///
/// # Returns
/// This function returns a boolean indicating if we need to report the current position
async fn report_position_elapsed(&mut self) -> bool {
if self.last_reported_pos_ts.elapsed().as_secs() > MAX_POSITION_TIME {
self.last_reported_pos_ts = std::time::Instant::now();
return true;
}
false
}

/// Process binlog events until an actionable event occurs.
///
/// # Arguments
Expand All @@ -571,6 +633,13 @@ impl MySqlBinlogConnector {
self.next_position.position = u64::from(binlog_event.header().log_pos());
}

let is_last = match until {
Some(limit) => {
let limit = MySqlPosition::try_from(limit).expect("Valid binlog limit");
self.next_position >= limit
}
None => false,
};
match binlog_event.header().event_type().map_err(|ev| {
mysql_async::Error::Other(Box::new(internal_err!(
"Unknown binlog event type {}",
Expand All @@ -588,7 +657,10 @@ impl MySqlBinlogConnector {
}

EventType::QUERY_EVENT => {
let action = match self.process_event_query(binlog_event.read_event()?).await {
let action = match self
.process_event_query(binlog_event.read_event()?, is_last)
.await
{
Ok(action) => action,
Err(mysql_async::Error::Other(ref err))
if err.downcast_ref::<ReadySetError>()
Expand Down Expand Up @@ -649,12 +721,21 @@ impl MySqlBinlogConnector {

EventType::TRANSACTION_PAYLOAD_EVENT => {
return Ok((
self.process_event_transaction_payload(binlog_event.read_event()?)
self.process_event_transaction_payload(binlog_event.read_event()?, is_last)
.await?,
&self.next_position,
));
}

EventType::XID_EVENT => {
// Generated for a commit of a transaction that modifies one or more tables of
// an XA-capable storage engine (InnoDB).
if self.report_position_elapsed().await || is_last {
return Ok((vec![ReplicationAction::LogPosition], &self.next_position));
}
continue;
}

EventType::WRITE_ROWS_EVENT_V1 => unimplemented!(), /* The V1 event numbers are */
// used from 5.1.16 until
// mysql-5.6.
Expand Down Expand Up @@ -735,11 +816,8 @@ impl MySqlBinlogConnector {

// We didn't get an actionable event, but we still need to check that we haven't reached
// the until limit
if let Some(limit) = until {
let limit = MySqlPosition::try_from(limit).expect("Valid binlog limit");
if self.next_position >= limit {
return Ok((vec![ReplicationAction::LogPosition], &self.next_position));
}
if is_last {
return Ok((vec![ReplicationAction::LogPosition], &self.next_position));
}
}
}
Expand Down

0 comments on commit dd425fe

Please sign in to comment.