Skip to content

Commit

Permalink
Correctly detect binary / non binary column for MYSQL_TYPE_VAR_STRING…
Browse files Browse the repository at this point in the history
… and MYSQL_TYPE_STRING
  • Loading branch information
Sevenannn committed Aug 2, 2024
1 parent b7bd69f commit cea5937
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
38 changes: 32 additions & 6 deletions src/sql/arrow_sql_gen/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use arrow::{
use bigdecimal::BigDecimal;
use bigdecimal::ToPrimitive;
use chrono::{NaiveDate, NaiveTime, Timelike};
use mysql_async::{consts::ColumnType, FromValueError, Row, Value};
use mysql_async::{consts::ColumnFlags, consts::ColumnType, FromValueError, Row, Value};
use snafu::{ResultExt, Snafu};
use std::{convert, sync::Arc};

Expand Down Expand Up @@ -93,13 +93,15 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
let mut arrow_columns_builders: Vec<Option<Box<dyn ArrayBuilder>>> = Vec::new();
let mut mysql_types: Vec<ColumnType> = Vec::new();
let mut column_names: Vec<String> = Vec::new();
let mut column_is_binary_stats: Vec<bool> = Vec::new();

if !rows.is_empty() {
let row = &rows[0];
for column in row.columns().iter() {
let column_name = column.name_str();
let column_type = column.column_type();
let data_type = map_column_to_data_type(column_type);
let column_is_binary = column.flags().contains(ColumnFlags::BINARY_FLAG);
let data_type = map_column_to_data_type(column_type, column_is_binary);
arrow_fields.push(
data_type
.clone()
Expand All @@ -109,6 +111,7 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
.push(map_data_type_to_array_builder_optional(data_type.as_ref()));
mysql_types.push(column_type);
column_names.push(column_name.to_string());
column_is_binary_stats.push(column_is_binary);
}
}

Expand Down Expand Up @@ -275,7 +278,25 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
}
column_type @ (ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_VAR_STRING) => {
handle_primitive_type!(builder, column_type, BinaryBuilder, Vec<u8>, row, i);
if column_is_binary_stats[i] {
handle_primitive_type!(
builder,
column_type,
BinaryBuilder,
Vec<u8>,
row,
i
);
} else {
handle_primitive_type!(
builder,
column_type,
LargeStringBuilder,
String,
row,
i
);
}
}
ColumnType::MYSQL_TYPE_DATE => {
let Some(builder) = builder else {
Expand Down Expand Up @@ -409,7 +430,7 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
}

#[allow(clippy::unnecessary_wraps)]
pub fn map_column_to_data_type(column_type: ColumnType) -> Option<DataType> {
pub fn map_column_to_data_type(column_type: ColumnType, column_flag: bool) -> Option<DataType> {
match column_type {
ColumnType::MYSQL_TYPE_NULL => Some(DataType::Null),
ColumnType::MYSQL_TYPE_BIT => Some(DataType::UInt64),
Expand All @@ -436,8 +457,13 @@ pub fn map_column_to_data_type(column_type: ColumnType) -> Option<DataType> {
| ColumnType::MYSQL_TYPE_MEDIUM_BLOB
| ColumnType::MYSQL_TYPE_LONG_BLOB => Some(DataType::LargeUtf8),
ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_VAR_STRING => Some(DataType::Binary),

| ColumnType::MYSQL_TYPE_VAR_STRING => {
if column_flag {
Some(DataType::Binary)
} else {
Some(DataType::LargeUtf8)
}
},
// replication only
ColumnType::MYSQL_TYPE_TYPED_ARRAY
// internal
Expand Down
10 changes: 9 additions & 1 deletion src/sql/db_connection_pool/dbconnection/mysqlconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ fn columns_meta_to_schema(columns_meta: Vec<Row>) -> Result<SchemaRef> {
})?;

let column_type = map_str_type_to_column_type(&data_type)?;
let column_is_binary = map_str_type_to_is_binary(&data_type);

let arrow_data_type = match column_type {
// map_column_to_data_type does not support decimal mapping and uses special logic to handle conversion based on actual value
Expand All @@ -177,7 +178,7 @@ fn columns_meta_to_schema(columns_meta: Vec<Row>) -> Result<SchemaRef> {
// rows_to_arrow uses hardcoded precision 38 for decimal so we use it here as well
DataType::Decimal128(38, scale)
}
_ => map_column_to_data_type(column_type)
_ => map_column_to_data_type(column_type, column_is_binary)
.context(UnsupportedDataTypeSnafu { data_type })?,
};
fields.push(Field::new(&column_name, arrow_data_type, true));
Expand Down Expand Up @@ -234,6 +235,13 @@ fn map_str_type_to_column_type(data_type: &str) -> Result<ColumnType> {
Ok(column_type)
}

fn map_str_type_to_is_binary(data_type: &str) -> bool {
if data_type.starts_with("binary") | data_type.starts_with("varbinary") {
return true;
}
false
}

fn extract_decimal_precision_and_scale(data_type: &str) -> Result<(u8, i8)> {
let (start, end) = match (data_type.find('('), data_type.find(')')) {
(Some(start), Some(end)) => (start, end),
Expand Down

0 comments on commit cea5937

Please sign in to comment.