Skip to content

Commit

Permalink
replicators: Enhance MySQL Snapshot.
Browse files Browse the repository at this point in the history
Adjust the MySQL snapshot to use the new snapshot type. The snapshot
type checks if table has a primary key or unique key and uses it to
define batches to query the table, making it less intrusive, specially
for large tables. In case the table does not have a primary key or
unique key, the snapshot will do a full table scan.

Fixes: REA-4477
Fixes: #1303

Release-Note-Core: Enhance MySQL snapshot to use Primary Key or unique
   key when available. This makes the snapshot less intrusive than a
   `SELECT *` (Full Table Scan) for large tables.

Change-Id: Iafda6ea6c74888262a0eea8bc1e880a3214b068a
  • Loading branch information
altmannmarcelo committed Jul 2, 2024
1 parent d6f38ab commit 053a0f1
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 91 deletions.
165 changes: 81 additions & 84 deletions replicators/src/mysql_connector/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ use tracing_futures::Instrument;

use super::utils::mysql_pad_collation_column;
use crate::db_util::DatabaseSchemas;
use crate::mysql_connector::snapshot_type::SnapshotType;
use crate::table_filter::TableFilter;
use crate::TablesSnapshottingGaugeGuard;

const BATCH_SIZE: usize = 1000; // How many queries to buffer before pushing to ReadySet
const RS_BATCH_SIZE: usize = 1000; // How many queries to buffer before pushing to ReadySet

const MAX_SNAPSHOT_BATCH: usize = 8; // How many tables to snapshot at the same time

Expand Down Expand Up @@ -323,10 +324,8 @@ impl MySqlReplicator {
Ok((tx, table_list))
}

/// Call `SELECT * FROM table` and convert all rows into a ReadySet row
/// it may seem inefficient but apparently that is the correct way to
/// replicate a table, and `mysqldump` and `debezium` do just that
pub(crate) async fn dump_table(&self, table: &Relation) -> mysql::Result<TableDumper> {
/// Get one transaction that will be used to snapshot table data
pub(crate) async fn get_one_transaction(&self) -> mysql::Result<Transaction<'static>> {
let mut tx = self
.pool
.start_transaction(tx_opts())
Expand All @@ -338,16 +337,7 @@ impl MySqlReplicator {
.await
.map_err(log_err);

let query_count = format!(
"select count(*) from {}",
table.display(nom_sql::Dialect::MySQL)
);
let query = format!("select * from {}", table.display(nom_sql::Dialect::MySQL));
Ok(TableDumper {
query_count,
query,
tx,
})
Ok(tx)
}

/// Get MySQL Server Version
Expand Down Expand Up @@ -407,25 +397,39 @@ impl MySqlReplicator {
Ok(conn)
}

/// Replicate a single table from the provided TableDumper and into ReadySet by
/// converting every MySQL row into ReadySet row and calling `insert_many` in batches
async fn replicate_table(
mut dumper: TableDumper,
/// Copy all the rows from the provided table into ReadySet by
/// converting every MySQL row into ReadySet row and calling `insert_many` in batches.
/// Depending on the table schema, we may use a key-based snapshotting strategy.
/// Which consists of batching rows based on the primary key or unique key of the table.
/// If the table does not have a primary key or unique key, we will use a full table scan.
///
/// # Arguments
/// * `trx` - The transaction to use for snapshotting. This transaction was opened while the
/// table were locked, meaning it will see the same data as the binlog position recorded for
/// this table.
/// * `table_mutator` - The table mutator to insert the rows into
async fn snapshot_table(
mut trx: Transaction<'static>,
mut table_mutator: readyset_client::Table,
snapshot_report_interval_secs: u16,
) -> ReadySetResult<()> {
let mut cnt = 0;

let mut snapshot_type = SnapshotType::new(&table_mutator)?;
let (count_query, initial_query, bound_base_query) =
snapshot_type.get_queries(&table_mutator);
// Query for number of rows first
let nrows: usize = dumper
.tx
.query_first(&dumper.query_count)
let nrows: usize = trx
.query_first(count_query)
.await
.map_err(log_err)?
.unwrap_or(0);

let mut row_stream = dumper.stream().await.map_err(log_err)?;
let mut rows = Vec::with_capacity(BATCH_SIZE);
let mut row_stream = trx
.exec_iter(&initial_query, mysql::Params::Empty)
.await
.map_err(log_err)?;
let mut rows = Vec::with_capacity(RS_BATCH_SIZE);

info!(rows = %nrows, "Snapshotting started");

Expand All @@ -436,36 +440,61 @@ impl MySqlReplicator {
let mut last_report_time = start_time;
let snapshot_report_interval_secs = snapshot_report_interval_secs as u64;

loop {
let row = match row_stream.next().await {
Ok(Some(row)) => row,
Ok(None) => break,
Err(err) if cnt == nrows => {
info!(error = %err, "Error encountered during snapshot, but all rows replicated successfully");
break;
}
Err(err) => {
return Err(log_err(err));
// Loop until we have no more batches to process
while cnt != nrows {
// Still have rows in this batch
while !row_stream.is_empty() {
let row = row_stream.next().await.map_err(log_err)?;
let df_row = match row.as_ref().map(mysql_row_to_noria_row).transpose() {
Ok(Some(df_row)) => df_row,
Ok(None) => break,
Err(err) if cnt == nrows => {
info!(error = %err, "Error encountered during snapshot, but all rows replicated successfully");
break;
}
Err(err) => {
return Err(log_err(err));
}
};
rows.push(df_row);
cnt += 1;

if rows.len() == RS_BATCH_SIZE {
// We aggregate rows into batches and then send them all to noria
let send_rows = std::mem::replace(&mut rows, Vec::with_capacity(RS_BATCH_SIZE));
table_mutator
.insert_many(send_rows)
.await
.map_err(log_err)?;
}
};

rows.push(row);
cnt += 1;
if row_stream.is_empty() && cnt != nrows && snapshot_type.is_key_based() {
// Last row from batch. Update lower bound with last row.
snapshot_type.set_lower_bound(row.as_ref().unwrap());
}

if rows.len() == BATCH_SIZE {
// We aggregate rows into batches and then send them all to noria
let send_rows = std::mem::replace(&mut rows, Vec::with_capacity(BATCH_SIZE));
table_mutator
.insert_many(send_rows)
if snapshot_report_interval_secs != 0
&& last_report_time.elapsed().as_secs() > snapshot_report_interval_secs
{
last_report_time = Instant::now();
crate::log_snapshot_progress(start_time.elapsed(), cnt as i64, nrows as i64);
}
}
if cnt != nrows {
// Next batch
row_stream = trx
.exec_iter(
&bound_base_query,
mysql::Params::Positional(snapshot_type.get_lower_bound()?),
)
.await
.map_err(log_err)?;
}

if snapshot_report_interval_secs != 0
&& last_report_time.elapsed().as_secs() > snapshot_report_interval_secs
{
last_report_time = Instant::now();
crate::log_snapshot_progress(start_time.elapsed(), cnt as i64, nrows as i64);
if row_stream.is_empty() {
return Err(internal_err!(
"Snapshotting for table {:?} stopped before all rows were replicated. Next batch query returned no rows.",
table_mutator.table_name()
));
}
}
}

Expand Down Expand Up @@ -589,7 +618,7 @@ impl MySqlReplicator {
let repl_offset = ReplicationOffset::from(self.get_binlog_position().await?);
span.in_scope(|| info!("Snapshotting table"));

let dumper = self.dump_table(&table).instrument(span.clone()).await?;
let trx = self.get_one_transaction().instrument(span.clone()).await?;

// At this point we have a transaction that will see *that* table at *this* binlog
// position, so we can drop the read lock
Expand All @@ -602,7 +631,7 @@ impl MySqlReplicator {
(
table,
repl_offset,
Self::replicate_table(dumper, table_mutator, snapshot_report_interval_secs)
Self::snapshot_table(trx, table_mutator, snapshot_report_interval_secs)
.instrument(span)
.await,
)
Expand Down Expand Up @@ -733,40 +762,8 @@ impl MySqlReplicator {
}
}

/// An intermediary struct that can be used to get a stream of ReadySet rows
// This is required because mysql::QueryResult borrows from conn and then
// we have some hard to solve borrowing issues
pub(crate) struct TableDumper {
query_count: String,
query: String,
tx: mysql::Transaction<'static>,
}

impl TableDumper {
pub(crate) async fn stream(&mut self) -> mysql::Result<TableStream<'_>> {
Ok(TableStream {
query: self.tx.exec_iter(&self.query, ()).await?,
})
}
}

// Just another helper struct to make it streamable
pub(crate) struct TableStream<'a> {
query: mysql::QueryResult<'a, 'static, mysql::BinaryProtocol>,
}

impl<'a> TableStream<'a> {
/// Get the next row from the query response
pub(crate) async fn next<'b>(
&'b mut self,
) -> ReadySetResult<Option<Vec<readyset_data::DfValue>>> {
let next_row = self.query.next().await?;
next_row.map(mysql_row_to_noria_row).transpose()
}
}

/// Convert each entry in a row to a ReadySet type that can be inserted into the base tables
fn mysql_row_to_noria_row(row: mysql::Row) -> ReadySetResult<Vec<readyset_data::DfValue>> {
fn mysql_row_to_noria_row(row: &mysql::Row) -> ReadySetResult<Vec<readyset_data::DfValue>> {
let mut noria_row = Vec::with_capacity(row.len());
for idx in 0..row.len() {
let val = value_to_value(row.as_ref(idx).unwrap());
Expand Down
6 changes: 0 additions & 6 deletions replicators/src/mysql_connector/snapshot_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use super::utils::MYSQL_BATCH_SIZE;
/// The type of snapshot to be taken
/// KeyBased: Snapshot based on the primary key or unique key
/// FullTableScan: Snapshot the entire table
#[allow(dead_code)]
pub enum SnapshotType {
KeyBased {
keys: Vec<String>,
Expand All @@ -18,7 +17,6 @@ pub enum SnapshotType {
}

impl SnapshotType {
#[allow(dead_code)]
pub fn new(table: &readyset_client::Table) -> ReadySetResult<Self> {
let cts = match table.schema() {
Some(cts) => cts,
Expand All @@ -45,7 +43,6 @@ impl SnapshotType {
/// Returns:
/// * The lower bound
/// Errors if the snapshot type is FullTableScan or the lower bound is not set
#[allow(dead_code)]
pub fn get_lower_bound(&mut self) -> ReadySetResult<Vec<mysql::Value>> {
match self {
SnapshotType::KeyBased {
Expand All @@ -71,7 +68,6 @@ impl SnapshotType {
///
/// Returns:
/// * A tuple containing the count query, the initial query, and the bound based query
#[allow(dead_code)]
pub fn get_queries(&self, table: &readyset_client::Table) -> (String, String, String) {
//TODO(marce): COUNT(1) Or COUNT(PK) might have better performance
let count_query = format!(
Expand Down Expand Up @@ -127,7 +123,6 @@ impl SnapshotType {
///
/// Arguments:
/// * `row` - The row to compute the lower bound from
#[allow(dead_code)]
pub fn set_lower_bound(&mut self, row: &mysql::Row) {
match self {
SnapshotType::KeyBased {
Expand All @@ -151,7 +146,6 @@ impl SnapshotType {
///
/// Returns:
/// * True if the snapshot type is key based, false otherwise
#[allow(dead_code)]
pub fn is_key_based(&self) -> bool {
matches!(self, SnapshotType::KeyBased { .. })
}
Expand Down
1 change: 0 additions & 1 deletion replicators/src/mysql_connector/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use mysql_srv::ColumnType;
use readyset_data::DfValue;

//TODO(marce): Make this a configuration parameter or dynamically adjust based on the table size
#[allow(dead_code)]
pub const MYSQL_BATCH_SIZE: usize = 100_000; // How many rows to fetch at a time from MySQL

/// Pad a MYSQL_TYPE_STRING (CHAR / BINARY) column value to the correct length for the given column
Expand Down

0 comments on commit 053a0f1

Please sign in to comment.