Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: change binary array type from LargeBinaryArray to BinaryArray #3924

Merged
merged 12 commits into from
May 18, 2024
4 changes: 2 additions & 2 deletions src/datatypes/src/arrow_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub type BinaryArray = arrow::array::LargeBinaryArray;
pub type MutableBinaryArray = arrow::array::LargeBinaryBuilder;
pub type BinaryArray = arrow::array::BinaryArray;
pub type MutableBinaryArray = arrow::array::BinaryBuilder;
pub type StringArray = arrow::array::StringArray;
pub type MutableStringArray = arrow::array::StringBuilder;
2 changes: 1 addition & 1 deletion src/datatypes/src/types/binary_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl DataType for BinaryType {
}

fn as_arrow_type(&self) -> ArrowDataType {
ArrowDataType::LargeBinary
ArrowDataType::Binary
}

fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
Expand Down
8 changes: 4 additions & 4 deletions src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl Value {
Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())),
Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
Value::Date(v) => ScalarValue::Date32(Some(v.val())),
Value::DateTime(v) => ScalarValue::Date64(Some(v.val())),
Value::Null => to_null_scalar_value(output_type)?,
Expand Down Expand Up @@ -413,7 +413,7 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result<ScalarValu
ConcreteDataType::UInt64(_) => ScalarValue::UInt64(None),
ConcreteDataType::Float32(_) => ScalarValue::Float32(None),
ConcreteDataType::Float64(_) => ScalarValue::Float64(None),
ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None),
ConcreteDataType::Binary(_) => ScalarValue::Binary(None),
ConcreteDataType::String(_) => ScalarValue::Utf8(None),
ConcreteDataType::Date(_) => ScalarValue::Date32(None),
ConcreteDataType::DateTime(_) => ScalarValue::Date64(None),
Expand Down Expand Up @@ -2105,7 +2105,7 @@ mod tests {
.unwrap()
);
assert_eq!(
ScalarValue::LargeBinary(Some("world".as_bytes().to_vec())),
ScalarValue::Binary(Some("world".as_bytes().to_vec())),
Value::Binary(Bytes::from("world".as_bytes()))
.try_to_scalar_value(&ConcreteDataType::binary_datatype())
.unwrap()
Expand Down Expand Up @@ -2187,7 +2187,7 @@ mod tests {
.unwrap()
);
assert_eq!(
ScalarValue::LargeBinary(None),
ScalarValue::Binary(None),
Value::Null
.try_to_scalar_value(&ConcreteDataType::binary_datatype())
.unwrap()
Expand Down
10 changes: 9 additions & 1 deletion src/datatypes/src/vectors/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ impl From<Vec<Option<Vec<u8>>>> for BinaryVector {
}
}

impl From<Vec<&[u8]>> for BinaryVector {
fn from(data: Vec<&[u8]>) -> Self {
Self {
array: BinaryArray::from_iter_values(data),
}
}
}

impl Vector for BinaryVector {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::binary_datatype()
Expand Down Expand Up @@ -257,7 +265,7 @@ mod tests {

let arrow_arr = v.to_arrow_array();
assert_eq!(2, arrow_arr.len());
assert_eq!(&ArrowDataType::LargeBinary, arrow_arr.data_type());
assert_eq!(&ArrowDataType::Binary, arrow_arr.data_type());
}

#[test]
Expand Down
62 changes: 53 additions & 9 deletions src/datatypes/src/vectors/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ impl Helper {
Ok(match array.as_ref().data_type() {
ArrowDataType::Null => Arc::new(NullVector::try_from_arrow_array(array)?),
ArrowDataType::Boolean => Arc::new(BooleanVector::try_from_arrow_array(array)?),
ArrowDataType::LargeBinary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Binary => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::LargeBinary)
ArrowDataType::Binary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Binary)
.context(crate::error::ArrowComputeSnafu)?;
Arc::new(BinaryVector::try_from_arrow_array(array)?)
}
Expand All @@ -278,7 +278,7 @@ impl Helper {
ArrowDataType::LargeUtf8 => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Utf8)
.context(crate::error::ArrowComputeSnafu)?;
Arc::new(BinaryVector::try_from_arrow_array(array)?)
Arc::new(StringVector::try_from_arrow_array(array)?)
evenyag marked this conversation as resolved.
Show resolved Hide resolved
}
ArrowDataType::Date32 => Arc::new(DateVector::try_from_arrow_array(array)?),
ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?),
Expand Down Expand Up @@ -402,8 +402,10 @@ mod tests {
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::buffer::Buffer;
use arrow::datatypes::Int32Type;
use arrow_array::DictionaryArray;
use arrow_array::{BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeStringArray};
use arrow_schema::DataType;
use common_decimal::Decimal128;
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
Expand Down Expand Up @@ -576,10 +578,6 @@ mod tests {
fn test_try_into_vector() {
check_try_into_vector(NullArray::new(2));
check_try_into_vector(BooleanArray::from(vec![true, false]));
check_try_into_vector(LargeBinaryArray::from(vec![
"hello".as_bytes(),
"world".as_bytes(),
]));
check_try_into_vector(Int8Array::from(vec![1, 2, 3]));
check_try_into_vector(Int16Array::from(vec![1, 2, 3]));
check_try_into_vector(Int32Array::from(vec![1, 2, 3]));
Expand Down Expand Up @@ -611,6 +609,52 @@ mod tests {
Helper::try_into_vector(array).unwrap_err();
}

#[test]
fn test_try_binary_array_into_vector() {
let input_vec: Vec<&[u8]> = vec!["hello".as_bytes(), "world".as_bytes()];
let assertion_vector = BinaryVector::from(input_vec.clone());

let input_arrays: Vec<ArrayRef> = vec![
Arc::new(LargeBinaryArray::from(input_vec.clone())) as ArrayRef,
Arc::new(BinaryArray::from(input_vec.clone())) as ArrayRef,
Arc::new(FixedSizeBinaryArray::new(
5,
Buffer::from_vec("helloworld".as_bytes().to_vec()),
None,
)) as ArrayRef,
];

for input_array in input_arrays {
let vector = Helper::try_into_vector(input_array).unwrap();

assert_eq!(2, vector.len());
assert_eq!(0, vector.null_count());

let output_arrow_array: ArrayRef = vector.to_arrow_array();
assert_eq!(&DataType::Binary, output_arrow_array.data_type());
assert_eq!(&assertion_vector.to_arrow_array(), &output_arrow_array);
}
}

#[test]
fn test_large_string_array_into_vector() {
let input_vec = vec!["a", "b"];
let assertion_array = StringArray::from(input_vec.clone());

let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(input_vec));
let vector = Helper::try_into_vector(large_string_array).unwrap();
assert_eq!(2, vector.len());
assert_eq!(0, vector.null_count());

let output_arrow_array: StringArray = vector
.to_arrow_array()
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.clone();
assert_eq!(&assertion_array, &output_arrow_array);
}

#[test]
fn test_try_from_scalar_time_value() {
let vector = Helper::try_from_scalar_value(ScalarValue::Time32Second(Some(42)), 3).unwrap();
Expand Down
4 changes: 0 additions & 4 deletions src/mito2/src/memtable/partition_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,6 @@ impl Drop for KeyDict {

/// Buffer to store unsorted primary keys.
struct KeyBuffer {
// We use arrow's binary builder as out default binary builder
// is LargeBinaryBuilder
// TODO(yingwen): Change the type binary vector to Binary instead of LargeBinary.
/// Builder for binary key array.
key_builder: BinaryBuilder,
next_pk_index: usize,
}
Expand Down
101 changes: 99 additions & 2 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,28 @@ pub struct SstInfo {
mod tests {
use std::sync::Arc;

use common_datasource::file_format::parquet::BufferedWriter;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::arrow;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use table::predicate::Predicate;

use super::*;
use crate::cache::{CacheManager, PageKey};
use crate::sst::index::Indexer;
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::DEFAULT_WRITE_CONCURRENCY;
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
sst_region_metadata,
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};

Expand Down Expand Up @@ -399,4 +408,92 @@ mod tests {
let mut reader = builder.build().await.unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
}

#[tokio::test]
async fn test_read_large_binary() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);

let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};

let metadata = build_test_binary_test_region_metadata();
let json = metadata.to_json().unwrap();
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);

let props_builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(write_opts.row_group_size);

let writer_props = props_builder.build();

let write_format = WriteFormat::new(metadata);
let fields: Vec<_> = write_format
.arrow_schema()
.fields()
.into_iter()
.map(|field| {
let data_type = field.data_type().clone();
if data_type == DataType::Binary {
Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
} else {
Field::new(field.name(), data_type, field.is_nullable())
}
})
.collect();

let arrow_schema = Arc::new(Schema::new(fields));

// Ensures field_0 has LargeBinary type.
assert_eq!(
&DataType::LargeBinary,
arrow_schema.field_with_name("field_0").unwrap().data_type()
);
let mut buffered_writer = BufferedWriter::try_new(
file_path.clone(),
object_store.clone(),
arrow_schema.clone(),
Some(writer_props),
write_opts.write_buffer_size.as_bytes() as usize,
DEFAULT_WRITE_CONCURRENCY,
)
.await
.unwrap();

let batch = new_batch_with_binary(&["a"], 0, 60);
let arrow_batch = write_format.convert_batch(&batch).unwrap();
let arrays: Vec<_> = arrow_batch
.columns()
.iter()
.map(|array| {
let data_type = array.data_type().clone();
if data_type == DataType::Binary {
arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
} else {
array.clone()
}
})
.collect();
let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();

buffered_writer.write(&result).await.unwrap();
buffered_writer.close().await.unwrap();

let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_with_binary(&["a"], 0, 50),
new_batch_with_binary(&["a"], 50, 60),
],
)
.await;
}
}
Loading