Skip to content

Commit

Permalink
Merge branch 'spiceai' into sgrebnov/sqlite-retries
Browse files Browse the repository at this point in the history
  • Loading branch information
sgrebnov authored Oct 1, 2024
2 parents 9ed6f1e + b9cfc4d commit 502f8fd
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 41 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ datafusion-federation-sql = { git = "https://github.com/spiceai/datafusion-feder
itertools = "0.13.0"
dyn-clone = { version = "1.0.17", optional = true }
geo-types = "0.7.13"
fundu = "2.0.1"

[dev-dependencies]
anyhow = "1.0.86"
Expand Down
6 changes: 5 additions & 1 deletion src/sql/arrow_sql_gen/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use arrow::{
TimestampNanosecondBuilder, TimestampSecondBuilder, UInt16Builder, UInt32Builder,
UInt64Builder, UInt8Builder,
},
datatypes::{DataType, TimeUnit},
datatypes::{DataType, TimeUnit, UInt16Type},
};

pub fn map_data_type_to_array_builder_optional(
Expand All @@ -21,6 +21,7 @@ pub fn map_data_type_to_array_builder_optional(
}
}

#[allow(clippy::too_many_lines)]
pub fn map_data_type_to_array_builder(data_type: &DataType) -> Box<dyn ArrayBuilder> {
match data_type {
DataType::Int8 => Box::new(Int8Builder::new()),
Expand Down Expand Up @@ -67,6 +68,9 @@ pub fn map_data_type_to_array_builder(data_type: &DataType) -> Box<dyn ArrayBuil
(DataType::Int8, DataType::Utf8) => {
Box::new(StringDictionaryBuilder::<Int8Type>::new())
}
(DataType::UInt16, DataType::Utf8) => {
Box::new(StringDictionaryBuilder::<UInt16Type>::new())
}
_ => unimplemented!("Unimplemented dictionary type"),
},
DataType::Date32 => Box::new(Date32Builder::new()),
Expand Down
63 changes: 53 additions & 10 deletions src/sql/arrow_sql_gen/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use arrow::{
ArrayBuilder, ArrayRef, BinaryBuilder, Date32Builder, Decimal256Builder, Decimal128Builder, Float32Builder,
Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, LargeBinaryBuilder,
LargeStringBuilder, NullBuilder, RecordBatch, RecordBatchOptions,
StringBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder, UInt64Builder,
StringBuilder, Time64NanosecondBuilder, StringDictionaryBuilder, TimestampMicrosecondBuilder, UInt64Builder,
},
datatypes::{i256, DataType, Date32Type, Field, Schema, SchemaRef, TimeUnit},
datatypes::{i256, DataType, Date32Type, Field, Schema, SchemaRef, TimeUnit, UInt16Type},
};
use bigdecimal::BigDecimal;
use chrono::{NaiveDate, NaiveTime, Timelike};
Expand Down Expand Up @@ -92,6 +92,7 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
let mut mysql_types: Vec<ColumnType> = Vec::new();
let mut column_names: Vec<String> = Vec::new();
let mut column_is_binary_stats: Vec<bool> = Vec::new();
let mut column_is_enum_stats: Vec<bool> = Vec::new();
let mut column_use_large_str_or_blob_stats: Vec<bool> = Vec::new();

if !rows.is_empty() {
Expand All @@ -100,6 +101,7 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
let column_name = column.name_str();
let column_type = column.column_type();
let column_is_binary = column.flags().contains(ColumnFlags::BINARY_FLAG);
let column_is_enum = column.flags().contains(ColumnFlags::ENUM_FLAG);
let column_use_large_str_or_blob = column.column_length() > 2_u32.pow(31) - 1;

let (decimal_precision, decimal_scale) = match column_type {
Expand All @@ -120,6 +122,7 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
let data_type = map_column_to_data_type(
column_type,
column_is_binary,
column_is_enum,
column_use_large_str_or_blob,
decimal_precision,
decimal_scale,
Expand All @@ -135,6 +138,7 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
mysql_types.push(column_type);
column_names.push(column_name.to_string());
column_is_binary_stats.push(column_is_binary);
column_is_enum_stats.push(column_is_enum);
column_use_large_str_or_blob_stats.push(column_use_large_str_or_blob);
}
}
Expand Down Expand Up @@ -299,8 +303,7 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
}
}
column_type @ (ColumnType::MYSQL_TYPE_VARCHAR
| ColumnType::MYSQL_TYPE_JSON
| ColumnType::MYSQL_TYPE_ENUM) => {
| ColumnType::MYSQL_TYPE_JSON) => {
handle_primitive_type!(
builder,
column_type,
Expand Down Expand Up @@ -349,9 +352,40 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
),
}
}
ColumnType::MYSQL_TYPE_ENUM => {
// ENUM and SET values are returned as strings. For these, check that the type value is MYSQL_TYPE_STRING and that the ENUM_FLAG or SET_FLAG flag is set in the flags value.
// https://dev.mysql.com/doc/c-api/9.0/en/c-api-data-structures.html
unreachable!()
}
column_type @ (ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_VAR_STRING) => {
if column_is_binary_stats[i] {
// Handle MYSQL_TYPE_ENUM value
if column_is_enum_stats[i] {
let Some(builder) = builder else {
return NoBuilderForIndexSnafu { index: i }.fail();
};
let Some(builder) = builder
.as_any_mut()
.downcast_mut::<StringDictionaryBuilder<UInt16Type>>()
else {
return FailedToDowncastBuilderSnafu {
mysql_type: format!("{mysql_type:?}"),
}
.fail();
};

let v = handle_null_error(row.get_opt::<String, usize>(i).transpose())
.context(FailedToGetRowValueSnafu {
mysql_type: ColumnType::MYSQL_TYPE_ENUM,
})?;

match v {
Some(v) => {
builder.append_value(v);
}
None => builder.append_null(),
}
} else if column_is_binary_stats[i] {
handle_primitive_type!(
builder,
column_type,
Expand All @@ -361,7 +395,14 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
i
);
} else {
handle_primitive_type!(builder, column_type, StringBuilder, String, row, i);
handle_primitive_type!(
builder,
column_type,
StringBuilder,
String,
row,
i
);
}
}
ColumnType::MYSQL_TYPE_DATE => {
Expand Down Expand Up @@ -463,6 +504,7 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
pub fn map_column_to_data_type(
column_type: ColumnType,
column_is_binary: bool,
column_is_enum: bool,
column_use_large_str_or_blob: bool,
column_decimal_precision: Option<u8>,
column_decimal_scale: Option<i8>,
Expand Down Expand Up @@ -491,9 +533,7 @@ pub fn map_column_to_data_type(
Some(DataType::Time64(TimeUnit::Nanosecond))
}
ColumnType::MYSQL_TYPE_VARCHAR
| ColumnType::MYSQL_TYPE_JSON
| ColumnType::MYSQL_TYPE_ENUM
| ColumnType::MYSQL_TYPE_SET => Some(DataType::LargeUtf8),
| ColumnType::MYSQL_TYPE_JSON => Some(DataType::LargeUtf8),
// MYSQL_TYPE_BLOB includes TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT https://dev.mysql.com/doc/c-api/8.0/en/c-api-data-structures.html
// MySQL String Type Storage requirement: https://dev.mysql.com/doc/refman/8.4/en/storage-requirements.html
// Binary / Utf8 stores up to 2^31 - 1 length binary / non-binary string
Expand All @@ -505,9 +545,12 @@ pub fn map_column_to_data_type(
(false, false) => Some(DataType::Utf8),
}
}
ColumnType::MYSQL_TYPE_ENUM | ColumnType::MYSQL_TYPE_SET => unreachable!(),
ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_VAR_STRING => {
if column_is_binary {
if column_is_enum {
Some(DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)))
} else if column_is_binary {
Some(DataType::Binary)
} else {
Some(DataType::Utf8)
Expand Down
14 changes: 12 additions & 2 deletions src/sql/db_connection_pool/dbconnection/mysqlconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ fn columns_meta_to_schema(columns_meta: Vec<Row>) -> Result<SchemaRef> {

let column_type = map_str_type_to_column_type(&data_type)?;
let column_is_binary = map_str_type_to_is_binary(&data_type);
let column_is_enum = map_str_type_to_is_enum(&data_type);
let column_use_large_str_or_blob = map_str_type_to_use_large_str_or_blob(&data_type);

let (precision, scale) = match column_type {
Expand All @@ -199,6 +200,7 @@ fn columns_meta_to_schema(columns_meta: Vec<Row>) -> Result<SchemaRef> {
let arrow_data_type = map_column_to_data_type(
column_type,
column_is_binary,
column_is_enum,
column_use_large_str_or_blob,
precision,
scale,
Expand Down Expand Up @@ -235,8 +237,9 @@ fn map_str_type_to_column_type(data_type: &str) -> Result<ColumnType> {
_ if data_type.starts_with("array") => ColumnType::MYSQL_TYPE_TYPED_ARRAY,
_ if data_type.starts_with("json") => ColumnType::MYSQL_TYPE_JSON,
_ if data_type.starts_with("newdecimal") => ColumnType::MYSQL_TYPE_NEWDECIMAL,
_ if data_type.starts_with("enum") => ColumnType::MYSQL_TYPE_ENUM,
_ if data_type.starts_with("set") => ColumnType::MYSQL_TYPE_SET,
// MySQL ENUM & SET value is exported as MYSQL_TYPE_STRING under c api: https://dev.mysql.com/doc/c-api/9.0/en/c-api-data-structures.html
_ if data_type.starts_with("enum") => ColumnType::MYSQL_TYPE_STRING,
_ if data_type.starts_with("set") => ColumnType::MYSQL_TYPE_STRING,
_ if data_type.starts_with("tinyblob") => ColumnType::MYSQL_TYPE_BLOB,
_ if data_type.starts_with("tinytext") => ColumnType::MYSQL_TYPE_BLOB,
_ if data_type.starts_with("mediumblob") => ColumnType::MYSQL_TYPE_BLOB,
Expand Down Expand Up @@ -276,6 +279,13 @@ fn map_str_type_to_use_large_str_or_blob(data_type: &str) -> bool {
false
}

fn map_str_type_to_is_enum(data_type: &str) -> bool {
if data_type.starts_with("enum") {
return true;
}
false
}

fn extract_decimal_precision_and_scale(data_type: &str) -> Result<(u8, i8)> {
let (start, end) = match (data_type.find('('), data_type.find(')')) {
(Some(start), Some(end)) => (start, end),
Expand Down
Loading

0 comments on commit 502f8fd

Please sign in to comment.