From cea59379e94174ed47157bc681b89cb7d1de164c Mon Sep 17 00:00:00 2001 From: Sevenannn Date: Fri, 2 Aug 2024 11:54:14 -0700 Subject: [PATCH] Correctly detect binary / non binary column for MYSQL_TYPE_VAR_STRING and MYSQL_TYPE_STRING --- src/sql/arrow_sql_gen/mysql.rs | 38 ++++++++++++++++--- .../dbconnection/mysqlconn.rs | 10 ++++- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/src/sql/arrow_sql_gen/mysql.rs b/src/sql/arrow_sql_gen/mysql.rs index 8c6ac0e..3776775 100644 --- a/src/sql/arrow_sql_gen/mysql.rs +++ b/src/sql/arrow_sql_gen/mysql.rs @@ -11,7 +11,7 @@ use arrow::{ use bigdecimal::BigDecimal; use bigdecimal::ToPrimitive; use chrono::{NaiveDate, NaiveTime, Timelike}; -use mysql_async::{consts::ColumnType, FromValueError, Row, Value}; +use mysql_async::{consts::ColumnFlags, consts::ColumnType, FromValueError, Row, Value}; use snafu::{ResultExt, Snafu}; use std::{convert, sync::Arc}; @@ -93,13 +93,15 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result { let mut arrow_columns_builders: Vec>> = Vec::new(); let mut mysql_types: Vec = Vec::new(); let mut column_names: Vec = Vec::new(); + let mut column_is_binary_stats: Vec = Vec::new(); if !rows.is_empty() { let row = &rows[0]; for column in row.columns().iter() { let column_name = column.name_str(); let column_type = column.column_type(); - let data_type = map_column_to_data_type(column_type); + let column_is_binary = column.flags().contains(ColumnFlags::BINARY_FLAG); + let data_type = map_column_to_data_type(column_type, column_is_binary); arrow_fields.push( data_type .clone() @@ -109,6 +111,7 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result { .push(map_data_type_to_array_builder_optional(data_type.as_ref())); mysql_types.push(column_type); column_names.push(column_name.to_string()); + column_is_binary_stats.push(column_is_binary); } } @@ -275,7 +278,25 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result { } column_type @ (ColumnType::MYSQL_TYPE_STRING | ColumnType::MYSQL_TYPE_VAR_STRING) => { - handle_primitive_type!(builder, column_type, BinaryBuilder, Vec, row, i); + if column_is_binary_stats[i] { + handle_primitive_type!( + builder, + column_type, + BinaryBuilder, + Vec, + row, + i + ); + } else { + handle_primitive_type!( + builder, + column_type, + LargeStringBuilder, + String, + row, + i + ); + } } ColumnType::MYSQL_TYPE_DATE => { let Some(builder) = builder else { @@ -409,7 +430,7 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result { } #[allow(clippy::unnecessary_wraps)] -pub fn map_column_to_data_type(column_type: ColumnType) -> Option { +pub fn map_column_to_data_type(column_type: ColumnType, column_flag: bool) -> Option { match column_type { ColumnType::MYSQL_TYPE_NULL => Some(DataType::Null), ColumnType::MYSQL_TYPE_BIT => Some(DataType::UInt64), @@ -436,8 +457,13 @@ pub fn map_column_to_data_type(column_type: ColumnType) -> Option { | ColumnType::MYSQL_TYPE_MEDIUM_BLOB | ColumnType::MYSQL_TYPE_LONG_BLOB => Some(DataType::LargeUtf8), ColumnType::MYSQL_TYPE_STRING - | ColumnType::MYSQL_TYPE_VAR_STRING => Some(DataType::Binary), - + | ColumnType::MYSQL_TYPE_VAR_STRING => { + if column_flag { + Some(DataType::Binary) + } else { + Some(DataType::LargeUtf8) + } + }, // replication only ColumnType::MYSQL_TYPE_TYPED_ARRAY // internal diff --git a/src/sql/db_connection_pool/dbconnection/mysqlconn.rs b/src/sql/db_connection_pool/dbconnection/mysqlconn.rs index 7b5265d..bc806d3 100644 --- a/src/sql/db_connection_pool/dbconnection/mysqlconn.rs +++ b/src/sql/db_connection_pool/dbconnection/mysqlconn.rs @@ -167,6 +167,7 @@ fn columns_meta_to_schema(columns_meta: Vec) -> Result { })?; let column_type = map_str_type_to_column_type(&data_type)?; + let column_is_binary = map_str_type_to_is_binary(&data_type); let arrow_data_type = match column_type { // map_column_to_data_type does not support decimal mapping and uses special logic to handle conversion based on actual value @@ -177,7 +178,7 @@ fn columns_meta_to_schema(columns_meta: Vec) -> Result { // rows_to_arrow uses hardcoded precision 38 for decimal so we use it here as well DataType::Decimal128(38, scale) } - _ => map_column_to_data_type(column_type) + _ => map_column_to_data_type(column_type, column_is_binary) .context(UnsupportedDataTypeSnafu { data_type })?, }; fields.push(Field::new(&column_name, arrow_data_type, true)); @@ -234,6 +235,13 @@ fn map_str_type_to_column_type(data_type: &str) -> Result { Ok(column_type) } +fn map_str_type_to_is_binary(data_type: &str) -> bool { + if data_type.starts_with("binary") | data_type.starts_with("varbinary") { + return true; + } + false +} + fn extract_decimal_precision_and_scale(data_type: &str) -> Result<(u8, i8)> { let (start, end) = match (data_type.find('('), data_type.find(')')) { (Some(start), Some(end)) => (start, end),