Skip to content

Commit

Permalink
replicators: add collation support for CHAR and BINARY columns
Browse files Browse the repository at this point in the history
This commits adds proper collation support for CHAR and BINARY columns
in MySQL.
CHAR columns should be right padded with spaces to the column length
when storing them and BINARY should right pad zeros.

This commit fixes the issue at snapshot - During snapshot we do a
logical dump of data. MySQL removes padding spaces from CHAR columns
when retrieving them. So, we need to take the column collation into
consideration when storing them. One gotcha is with ENUM/SET columns,
they are retrieved as Strings(MYSQL_TYPE_STRING), but we should not
pad them.
During CDC, we need to retrieve proper
metadata from TME in order to validate if padding is necessary or not.

This commit also fixes an issue when storing BINARY columns. We were
storing them as TinyText/Text if the binary representation of the
columns was a valid UTF-8 string. This is not correct. We should store
them as ByteArray.

Test cases were written taking into consideration a mix of characters
from different bytes, like mixing ASCII and UTF-8 characters from
2nd and 3rd bytes.

Note: MySQL uses the terminology of charset and collation interchangeably.
In the end everything is stored as collation ID, which can be used to
determine the charset and collation.

Ref: REA-4366
Ref: REA-4383
Closes: #1247 #1259

Release-Note-Core: Added collation support for storing CHAR and BINARY
   columns in MySQL using the correct padding. This fixes an issue when
   looking up CHAR/BINARY columns with values that do not match the
   column length.

Change-Id: Ibb436b99b46500f940efe79d06d86494bfc4bf30
  • Loading branch information
altmannmarcelo committed May 24, 2024
1 parent 713673e commit ee1a20e
Show file tree
Hide file tree
Showing 6 changed files with 474 additions and 6 deletions.
137 changes: 137 additions & 0 deletions readyset-mysql/tests/query_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,3 +758,140 @@ async fn test_binlog_transaction_compression() {

shutdown_tx.shutdown().await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_char_padding_lookup() {
let query_status_cache: &'static _ = Box::leak(Box::new(QueryStatusCache::new()));
let (opts, _handle, shutdown_tx) = setup(
query_status_cache,
true, // fallback enabled
MigrationMode::OutOfBand,
UnsupportedSetMode::Error,
)
.await;
let mut conn = Conn::new(opts).await.unwrap();
conn.query_drop(
"CREATE TABLE `col_pad_lookup` (
id int NOT NULL PRIMARY KEY,
c CHAR(3)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;",
)
.await
.unwrap();
conn.query_drop("INSERT INTO `col_pad_lookup` VALUES (1, 'ࠈࠈ');")
.await
.unwrap();
conn.query_drop("INSERT INTO `col_pad_lookup` VALUES (2, 'A');")
.await
.unwrap();
conn.query_drop("INSERT INTO `col_pad_lookup` VALUES (3, 'AAA');")
.await
.unwrap();
sleep().await;
conn.query_drop("CREATE CACHE test FROM SELECT id, c FROM col_pad_lookup WHERE c = ?")
.await
.unwrap();
let row: Vec<(u32, String)> = conn
.query("SELECT id, c FROM col_pad_lookup WHERE c = 'ࠈࠈ'")
.await
.unwrap();
assert_eq!(row.len(), 1);
let last_status = last_query_info(&mut conn).await;
assert_eq!(last_status.destination, QueryDestination::Readyset);
assert_eq!(row[0].1, "ࠈࠈ ");

let row: Vec<(u32, String)> = conn
.query("SELECT id, c FROM col_pad_lookup WHERE c = 'A'")
.await
.unwrap();
assert_eq!(row.len(), 1);
let last_status = last_query_info(&mut conn).await;
assert_eq!(last_status.destination, QueryDestination::Readyset);
assert_eq!(row[0].1, "A ");

let row: Vec<(u32, String)> = conn
.query("SELECT id, c FROM col_pad_lookup WHERE c = 'AAA'")
.await
.unwrap();
assert_eq!(row.len(), 1);
let last_status = last_query_info(&mut conn).await;
assert_eq!(last_status.destination, QueryDestination::Readyset);
assert_eq!(row[0].1, "AAA");
shutdown_tx.shutdown().await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_binary_padding_lookup() {
let query_status_cache: &'static _ = Box::leak(Box::new(QueryStatusCache::new()));
let (opts, _handle, shutdown_tx) = setup(
query_status_cache,
true, // fallback enabled
MigrationMode::OutOfBand,
UnsupportedSetMode::Error,
)
.await;
let mut conn = Conn::new(opts).await.unwrap();
conn.query_drop(
"CREATE TABLE `col_pad_bin_lookup` (
id int NOT NULL PRIMARY KEY,
b BINARY(3)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;",
)
.await
.unwrap();
conn.query_drop("INSERT INTO `col_pad_bin_lookup` VALUES (1, 'ࠈ');")
.await
.unwrap();
conn.query_drop("INSERT INTO `col_pad_bin_lookup` VALUES (2, '¥');")
.await
.unwrap();
conn.query_drop("INSERT INTO `col_pad_bin_lookup` VALUES (3, 'A');")
.await
.unwrap();
conn.query_drop("INSERT INTO `col_pad_bin_lookup` VALUES (4, 'A¥');")
.await
.unwrap();

sleep().await;
conn.query_drop("CREATE CACHE test FROM SELECT id, b FROM col_pad_bin_lookup WHERE b = ?")
.await
.unwrap();
let row: Vec<(u32, String)> = conn
.query("SELECT id, b FROM col_pad_bin_lookup WHERE b = 'ࠈ'")
.await
.unwrap();
assert_eq!(row.len(), 1);
let last_status = last_query_info(&mut conn).await;
assert_eq!(last_status.destination, QueryDestination::Readyset);
assert_eq!(row[0].1, "0xe0a088");

let row: Vec<(u32, String)> = conn
.query("SELECT id, b FROM col_pad_bin_lookup WHERE b = '¥'")
.await
.unwrap();
assert_eq!(row.len(), 1);
let last_status = last_query_info(&mut conn).await;
assert_eq!(last_status.destination, QueryDestination::Readyset);
assert_eq!(row[0].1, "0xc2a500");

let row: Vec<(u32, String)> = conn
.query("SELECT id, b FROM col_pad_bin_lookup WHERE b = 'A'")
.await
.unwrap();
assert_eq!(row.len(), 1);
let last_status = last_query_info(&mut conn).await;
assert_eq!(last_status.destination, QueryDestination::Readyset);
assert_eq!(row[0].1, "0x410000");

let row: Vec<(u32, String)> = conn
.query("SELECT id, b FROM col_pad_bin_lookup WHERE b = 'A¥'")
.await
.unwrap();
assert_eq!(row.len(), 1);
let last_status = last_query_info(&mut conn).await;
assert_eq!(last_status.destination, QueryDestination::Readyset);
assert_eq!(row[0].1, "0x41c2a5");
shutdown_tx.shutdown().await;
}
33 changes: 28 additions & 5 deletions replicators/src/mysql_connector/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::io;
use async_trait::async_trait;
use binlog::consts::{BinlogChecksumAlg, EventType};
use metrics::counter;
use mysql::binlog::events::StatusVarVal;
use mysql::binlog::events::{OptionalMetaExtractor, StatusVarVal};
use mysql::binlog::jsonb::{self, JsonbToJsonError};
use mysql::prelude::Queryable;
use mysql_async as mysql;
Expand All @@ -22,6 +22,7 @@ use replication_offset::mysql::MySqlPosition;
use replication_offset::ReplicationOffset;
use tracing::{error, info, warn};

use crate::mysql_connector::utils::mysql_pad_collation_column;
use crate::noria_adapter::{Connector, ReplicationAction};

const CHECKSUM_QUERY: &str = "SET @source_binlog_checksum='CRC32'";
Expand Down Expand Up @@ -847,6 +848,7 @@ fn binlog_val_to_noria_val(
val: &mysql_common::value::Value,
col_kind: mysql_common::constants::ColumnType,
meta: &[u8],
collation: u16,
) -> mysql::Result<DfValue> {
// Not all values are coerced to the value expected by ReadySet directly

Expand All @@ -861,8 +863,8 @@ fn binlog_val_to_noria_val(
}
};

match (col_kind, meta) {
(ColumnType::MYSQL_TYPE_TIMESTAMP2, &[0]) => {
match (col_kind, meta, collation) {
(ColumnType::MYSQL_TYPE_TIMESTAMP2, &[0], _) => {
//https://github.com/blackbeam/rust_mysql_common/blob/408effed435c059d80a9e708bcfa5d974527f476/src/binlog/value.rs#L144
// When meta is 0, `mysql_common` encodes this value as number of seconds (since UNIX
// EPOCH)
Expand All @@ -878,7 +880,7 @@ fn binlog_val_to_noria_val(
// Can unwrap because we know it maps directly to [`DfValue`]
Ok(time.into())
}
(ColumnType::MYSQL_TYPE_TIMESTAMP2, _) => {
(ColumnType::MYSQL_TYPE_TIMESTAMP2, _, _) => {
// When meta is anything else, `mysql_common` encodes this value as number of
// seconds.microseconds (since UNIX EPOCH)
let s = String::from_utf8_lossy(buf);
Expand All @@ -891,6 +893,14 @@ fn binlog_val_to_noria_val(
// Can wrap because we know this maps directly to [`DfValue`]
Ok(time.into())
}
(ColumnType::MYSQL_TYPE_STRING, meta, collation) => {
Ok(mysql_pad_collation_column(
buf,
col_kind,
collation,
meta[1] as usize, // 2nd byte of meta is the length of the string
))
}
_ => Ok(val.try_into().map_err(|e| {
mysql_async::Error::Other(Box::new(internal_err!("Unable to coerce value {}", e)))
})?),
Expand All @@ -901,6 +911,9 @@ fn binlog_row_to_noria_row(
binlog_row: &BinlogRow,
tme: &binlog::events::TableMapEvent<'static>,
) -> mysql::Result<Vec<DfValue>> {
let opt_meta_extractor = OptionalMetaExtractor::new(tme.iter_optional_meta()).unwrap();
let mut charset_iter = opt_meta_extractor.iter_charset();
let mut enum_and_set_charset_iter = opt_meta_extractor.iter_enum_and_set_charset();
(0..binlog_row.len())
.map(|idx| {
match binlog_row.as_ref(idx).unwrap() {
Expand All @@ -916,7 +929,17 @@ fn binlog_row_to_noria_row(
.unwrap(),
tme.get_column_metadata(idx).unwrap(),
);
binlog_val_to_noria_val(val, kind, meta)
let charset = if kind.is_character_type() {
charset_iter.next().transpose()?.unwrap_or_default()
} else if kind.is_enum_or_set_type() {
enum_and_set_charset_iter
.next()
.transpose()?
.unwrap_or_default()
} else {
Default::default()
};
binlog_val_to_noria_val(val, kind, meta, charset)
}
BinlogValue::Jsonb(val) => {
let json: Result<serde_json::Value, _> = val.clone().try_into(); // urgh no TryFrom impl
Expand Down
1 change: 1 addition & 0 deletions replicators/src/mysql_connector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod connector;
mod snapshot;
mod utils;

pub(crate) use connector::MySqlBinlogConnector;
pub(crate) use snapshot::MySqlReplicator;
31 changes: 30 additions & 1 deletion replicators/src/mysql_connector/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use itertools::Itertools;
use mysql::prelude::Queryable;
use mysql::{Transaction, TxOpts};
use mysql_async as mysql;
use mysql_common::constants::ColumnType;
use mysql_srv::ColumnFlags;
use nom_sql::{DialectDisplay, NonReplicatedRelation, NotReplicatedReason, Relation};
use readyset_client::recipe::changelist::{Change, ChangeList};
use readyset_data::Dialect;
Expand All @@ -21,6 +23,7 @@ use tokio::task::JoinHandle;
use tracing::{debug, error, info, info_span, warn};
use tracing_futures::Instrument;

use super::utils::mysql_pad_collation_column;
use crate::db_util::DatabaseSchemas;
use crate::table_filter::TableFilter;
use crate::TablesSnapshottingGaugeGuard;
Expand Down Expand Up @@ -767,7 +770,33 @@ fn mysql_row_to_noria_row(row: mysql::Row) -> ReadySetResult<Vec<readyset_data::
let mut noria_row = Vec::with_capacity(row.len());
for idx in 0..row.len() {
let val = value_to_value(row.as_ref(idx).unwrap());
noria_row.push(readyset_data::DfValue::try_from(val)?);
let col = row.columns_ref().get(idx).unwrap();
let flags = col.flags();
// ENUM and SET columns are stored as integers and retrieved as strings. We don't need
// padding.
let require_padding = col.column_type() == ColumnType::MYSQL_TYPE_STRING
&& !flags.contains(ColumnFlags::ENUM_FLAG)
&& !flags.contains(ColumnFlags::SET_FLAG);
match require_padding {
true => {
let bytes = match val.clone() {
mysql_common::value::Value::Bytes(b) => b,
_ => {
return Err(internal_err!(
"Expected MYSQL_TYPE_STRING column to be of value Bytes, got {:?}",
val
));
}
};
noria_row.push(mysql_pad_collation_column(
&bytes,
col.column_type(),
col.character_set(),
col.column_length() as usize,
));
}
false => noria_row.push(readyset_data::DfValue::try_from(val)?),
}
}
Ok(noria_row)
}
Expand Down
46 changes: 46 additions & 0 deletions replicators/src/mysql_connector/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::sync::Arc;

use mysql_common::collations::{self, Collation, CollationId};
use mysql_srv::ColumnType;
use readyset_data::DfValue;

/// Pad a MYSQL_TYPE_STRING (CHAR / BINARY) column value to the correct length for the given column
/// type and charset.
///
/// Parameters:
/// - `val`: The current column value as a vector of bytes.
/// - `col`: The column type.
/// - `collation`: The collation ID of the column.
/// - `col_len`: The length of the column in bytes.
///
/// Returns:
/// - A `DfValue` representing the padded column value - `CHAR` will return a `TinyText` or `Text`
/// and `BINARY` will return a `ByteArray`.
pub fn mysql_pad_collation_column(
val: &Vec<u8>,
col: ColumnType,
collation: u16,
col_len: usize,
) -> DfValue {
assert_eq!(col, ColumnType::MYSQL_TYPE_STRING);
let collation: Collation = collations::CollationId::from(collation).into();
match collation.id() {
CollationId::BINARY => {
if val.len() < col_len {
let mut padded = val.clone();
padded.extend(std::iter::repeat(0).take(col_len - val.len()));
return DfValue::ByteArray(Arc::new(padded));
}
DfValue::ByteArray(Arc::new(val.to_vec()))
}
_ => {
let column_length_characters = col_len / collation.max_len() as usize;
let mut str = String::from_utf8_lossy(val).to_string();
let str_len = str.chars().count();
if str_len < column_length_characters {
str.extend(std::iter::repeat(' ').take(column_length_characters - str_len));
}
DfValue::from(str)
}
}
}
Loading

0 comments on commit ee1a20e

Please sign in to comment.