Skip to content

Commit

Permalink
Fix mysql blob types
Browse files Browse the repository at this point in the history
  • Loading branch information
Sevenannn committed Sep 25, 2024
1 parent 4615519 commit 37fea2b
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 22 deletions.
71 changes: 59 additions & 12 deletions src/sql/arrow_sql_gen/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::sql::arrow_sql_gen::arrow::map_data_type_to_array_builder_optional;
use arrow::{
array::{
ArrayBuilder, ArrayRef, BinaryBuilder, Date32Builder, Decimal128Builder, Float32Builder,
Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, LargeStringBuilder,
NullBuilder, RecordBatch, RecordBatchOptions, Time64NanosecondBuilder,
TimestampMicrosecondBuilder, UInt64Builder,
Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, LargeBinaryBuilder,
LargeStringBuilder, NullBuilder, RecordBatch, RecordBatchOptions, StringBuilder,
Time64NanosecondBuilder, TimestampMicrosecondBuilder, UInt64Builder,
},
datatypes::{DataType, Date32Type, Field, Schema, SchemaRef, TimeUnit},
};
Expand Down Expand Up @@ -123,6 +123,7 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
column_is_binary,
decimal_precision,
decimal_scale,
column_name.as_ref(),
);

arrow_fields.push(
Expand Down Expand Up @@ -266,10 +267,6 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
}
column_type @ (ColumnType::MYSQL_TYPE_VARCHAR
| ColumnType::MYSQL_TYPE_JSON
| ColumnType::MYSQL_TYPE_TINY_BLOB
| ColumnType::MYSQL_TYPE_BLOB
| ColumnType::MYSQL_TYPE_MEDIUM_BLOB
| ColumnType::MYSQL_TYPE_LONG_BLOB
| ColumnType::MYSQL_TYPE_ENUM) => {
handle_primitive_type!(
builder,
Expand All @@ -280,6 +277,45 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
i
);
}
ColumnType::MYSQL_TYPE_BLOB => {
match (
column_names[i].starts_with("long"),
column_is_binary_stats[i],
) {
(true, true) => handle_primitive_type!(
builder,
ColumnType::MYSQL_TYPE_BLOB,
LargeBinaryBuilder,
Vec<u8>,
row,
i
),
(true, false) => handle_primitive_type!(
builder,
ColumnType::MYSQL_TYPE_BLOB,
LargeStringBuilder,
String,
row,
i
),
(false, true) => handle_primitive_type!(
builder,
ColumnType::MYSQL_TYPE_BLOB,
BinaryBuilder,
Vec<u8>,
row,
i
),
(false, false) => handle_primitive_type!(
builder,
ColumnType::MYSQL_TYPE_BLOB,
StringBuilder,
String,
row,
i
),
}
}
column_type @ (ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_VAR_STRING) => {
if column_is_binary_stats[i] {
Expand Down Expand Up @@ -403,6 +439,7 @@ pub fn map_column_to_data_type(
column_is_binary: bool,
column_decimal_precision: Option<u8>,
column_decimal_scale: Option<i8>,
column_name: &str,
) -> Option<DataType> {
match column_type {
ColumnType::MYSQL_TYPE_NULL => Some(DataType::Null),
Expand All @@ -425,11 +462,18 @@ pub fn map_column_to_data_type(
ColumnType::MYSQL_TYPE_VARCHAR
| ColumnType::MYSQL_TYPE_JSON
| ColumnType::MYSQL_TYPE_ENUM
| ColumnType::MYSQL_TYPE_SET
| ColumnType::MYSQL_TYPE_TINY_BLOB
| ColumnType::MYSQL_TYPE_BLOB
| ColumnType::MYSQL_TYPE_MEDIUM_BLOB
| ColumnType::MYSQL_TYPE_LONG_BLOB => Some(DataType::LargeUtf8),
| ColumnType::MYSQL_TYPE_SET => Some(DataType::LargeUtf8),
// MYSQL_TYPE_BLOB includes TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT https://dev.mysql.com/doc/c-api/8.0/en/c-api-data-structures.html
// MySQL String Type Storage requirement: https://dev.mysql.com/doc/refman/8.4/en/storage-requirements.html
// Binary / Utf8 stores up to 2^31 - 1 length binary / non-binary string
ColumnType::MYSQL_TYPE_BLOB => {
match (column_name.starts_with("long"), column_is_binary) {
(true, true) => Some(DataType::LargeBinary),
(true, false) => Some(DataType::LargeUtf8),
(false, true) => Some(DataType::Binary),
(false, false) => Some(DataType::Utf8),
}
}
ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_VAR_STRING => {
if column_is_binary {
Expand All @@ -447,6 +491,9 @@ pub fn map_column_to_data_type(
| ColumnType::MYSQL_TYPE_TIMESTAMP2
| ColumnType::MYSQL_TYPE_DATETIME2
| ColumnType::MYSQL_TYPE_TIME2
| ColumnType::MYSQL_TYPE_LONG_BLOB
| ColumnType::MYSQL_TYPE_TINY_BLOB
| ColumnType::MYSQL_TYPE_MEDIUM_BLOB
| ColumnType::MYSQL_TYPE_GEOMETRY => {
unimplemented!("Unsupported column type {:?}", column_type)
}
Expand Down
31 changes: 21 additions & 10 deletions src/sql/db_connection_pool/dbconnection/mysqlconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,14 @@ fn columns_meta_to_schema(columns_meta: Vec<Row>) -> Result<SchemaRef> {
_ => (None, None),
};

let arrow_data_type =
map_column_to_data_type(column_type, column_is_binary, precision, scale)
.context(UnsupportedDataTypeSnafu { data_type })?;
let arrow_data_type = map_column_to_data_type(
column_type,
column_is_binary,
precision,
scale,
&column_name,
)
.context(UnsupportedDataTypeSnafu { data_type })?;

fields.push(Field::new(&column_name, arrow_data_type, true));
}
Expand Down Expand Up @@ -232,12 +237,12 @@ fn map_str_type_to_column_type(data_type: &str) -> Result<ColumnType> {
_ if data_type.starts_with("newdecimal") => ColumnType::MYSQL_TYPE_NEWDECIMAL,
_ if data_type.starts_with("enum") => ColumnType::MYSQL_TYPE_ENUM,
_ if data_type.starts_with("set") => ColumnType::MYSQL_TYPE_SET,
_ if data_type.starts_with("tinyblob") => ColumnType::MYSQL_TYPE_TINY_BLOB,
_ if data_type.starts_with("tinytext") => ColumnType::MYSQL_TYPE_TINY_BLOB,
_ if data_type.starts_with("mediumblob") => ColumnType::MYSQL_TYPE_MEDIUM_BLOB,
_ if data_type.starts_with("mediumtext") => ColumnType::MYSQL_TYPE_MEDIUM_BLOB,
_ if data_type.starts_with("longblob") => ColumnType::MYSQL_TYPE_LONG_BLOB,
_ if data_type.starts_with("longtext") => ColumnType::MYSQL_TYPE_LONG_BLOB,
_ if data_type.starts_with("tinyblob") => ColumnType::MYSQL_TYPE_BLOB,
_ if data_type.starts_with("tinytext") => ColumnType::MYSQL_TYPE_BLOB,
_ if data_type.starts_with("mediumblob") => ColumnType::MYSQL_TYPE_BLOB,
_ if data_type.starts_with("mediumtext") => ColumnType::MYSQL_TYPE_BLOB,
_ if data_type.starts_with("longblob") => ColumnType::MYSQL_TYPE_BLOB,
_ if data_type.starts_with("longtext") => ColumnType::MYSQL_TYPE_BLOB,
_ if data_type.starts_with("blob") => ColumnType::MYSQL_TYPE_BLOB,
_ if data_type.starts_with("text") => ColumnType::MYSQL_TYPE_BLOB,
_ if data_type.starts_with("varbinary") => ColumnType::MYSQL_TYPE_VAR_STRING,
Expand All @@ -251,7 +256,13 @@ fn map_str_type_to_column_type(data_type: &str) -> Result<ColumnType> {
}

fn map_str_type_to_is_binary(data_type: &str) -> bool {
if data_type.starts_with("binary") | data_type.starts_with("varbinary") {
if data_type.starts_with("binary")
| data_type.starts_with("varbinary")
| data_type.starts_with("tinyblob")
| data_type.starts_with("mediumblob")
| data_type.starts_with("blob")
| data_type.starts_with("longblob")
{
return true;
}
false
Expand Down
63 changes: 63 additions & 0 deletions tests/mysql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,68 @@ VALUES
.await;
}

async fn test_mysql_blob_types(port: usize) {
let create_table_stmt = "
CREATE TABLE blobs_table (
tinyblob_col TINYBLOB,
tinytext_col TINYTEXT,
mediumblob_col MEDIUMBLOB,
mediumtext_col MEDIUMTEXT,
blob_col BLOB,
text_col TEXT,
longblob_col LONGBLOB,
longtext_col LONGTEXT
);
";
let insert_table_stmt = "
INSERT INTO blobs_table (
tinyblob_col, tinytext_col, mediumblob_col, mediumtext_col, blob_col, text_col, longblob_col, longtext_col
)
VALUES
(
'small_blob', 'small_text',
'medium_blob', 'medium_text',
'larger_blob', 'larger_text',
'very_large_blob', 'very_large_text'
);
";

let schema = Arc::new(Schema::new(vec![
Field::new("tinyblob_col", DataType::Binary, true),
Field::new("tinytext_col", DataType::Utf8, true),
Field::new("mediumblob_col", DataType::Binary, true),
Field::new("mediumtext_col", DataType::Utf8, true),
Field::new("blob_col", DataType::Binary, true),
Field::new("text_col", DataType::Utf8, true),
Field::new("longblob_col", DataType::LargeBinary, true),
Field::new("longtext_col", DataType::LargeUtf8, true),
]));

let expected_record = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(BinaryArray::from_vec(vec![b"small_blob"])),
Arc::new(StringArray::from(vec!["small_text"])),
Arc::new(BinaryArray::from_vec(vec![b"medium_blob"])),
Arc::new(StringArray::from(vec!["medium_text"])),
Arc::new(BinaryArray::from_vec(vec![b"larger_blob"])),
Arc::new(StringArray::from(vec!["larger_text"])),
Arc::new(LargeBinaryArray::from_vec(vec![b"very_large_blob"])),
Arc::new(LargeStringArray::from(vec!["very_large_text"])),
],
)
.expect("Failed to created arrow record batch");

arrow_mysql_one_way(
port,
"blobs_table",
create_table_stmt,
insert_table_stmt,
expected_record,
)
.await;
}

async fn arrow_mysql_one_way(
port: usize,
table_name: &str,
Expand Down Expand Up @@ -383,6 +445,7 @@ async fn test_mysql_arrow_oneway() {
test_mysql_timestamp_types(port).await;
test_mysql_datetime_types(port).await;
test_time_types(port).await;
test_mysql_blob_types(port).await;

mysql_container.remove().await.expect("container to stop");
}

0 comments on commit 37fea2b

Please sign in to comment.