From 2bcc0cfdbf128b94505b7310d680157f6b9f20cb Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 15 Apr 2022 05:52:03 -0700 Subject: [PATCH] initial commit (#1564) --- .../tests/test_sql.py | 38 +++++- arrow/src/array/ffi.rs | 116 +++++++++++++++- arrow/src/datatypes/ffi.rs | 28 +++- arrow/src/ffi.rs | 129 +++++++++++++++++- 4 files changed, 302 insertions(+), 9 deletions(-) diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index 058a32ea800e..9d5b93679b6b 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -61,9 +61,11 @@ def assert_pyarrow_leak(): pa.decimal128(19, 4), pa.string(), pa.binary(), + pa.binary(10), pa.large_string(), pa.large_binary(), pa.list_(pa.int32()), + pa.list_(pa.int32(), 2), pa.large_list(pa.uint16()), pa.struct( [ @@ -85,8 +87,6 @@ def assert_pyarrow_leak(): _unsupported_pyarrow_types = [ pa.decimal256(76, 38), pa.duration("s"), - pa.binary(10), - pa.list_(pa.int32(), 2), pa.map_(pa.string(), pa.int32()), pa.union( [pa.field("a", pa.binary(10)), pa.field("b", pa.string())], @@ -190,6 +190,29 @@ def test_time32_python(): del b del expected +def test_binary_array(): + """ + Python -> Rust -> Python + """ + a = pa.array(["a", None, "bb", "ccc"], pa.binary()) + b = rust.round_trip_array(a) + b.validate(full=True) + assert a.to_pylist() == b.to_pylist() + assert a.type == b.type + del a + del b + +def test_fixed_len_binary_array(): + """ + Python -> Rust -> Python + """ + a = pa.array(["aaa", None, "bbb", "ccc"], pa.binary(3)) + b = rust.round_trip_array(a) + b.validate(full=True) + assert a.to_pylist() == b.to_pylist() + assert a.type == b.type + del a + del b def test_list_array(): """ @@ -203,6 +226,17 @@ def test_list_array(): del a del b +def test_fixed_len_list_array(): + """ + Python -> Rust -> Python + """ + a = pa.array([[1, 2], None, [3, 4], [5, 6]], pa.list_(pa.int64(), 2)) + b = rust.round_trip_array(a) + b.validate(full=True) + assert a.to_pylist() == b.to_pylist() + assert a.type == b.type + del a + del b def test_timestamp_python(): """ diff --git a/arrow/src/array/ffi.rs b/arrow/src/array/ffi.rs index 976c6b8ce497..51da67c59e6c 100644 --- a/arrow/src/array/ffi.rs +++ b/arrow/src/array/ffi.rs @@ -45,12 +45,14 @@ impl TryFrom for ffi::ArrowArray { #[cfg(test)] mod tests { - use crate::array::{DictionaryArray, Int32Array, StringArray}; + use crate::array::{DictionaryArray, FixedSizeListArray, Int32Array, StringArray}; + use crate::buffer::Buffer; use crate::error::Result; + use crate::util::bit_util; use crate::{ array::{ - Array, ArrayData, BooleanArray, Int64Array, StructArray, UInt32Array, - UInt64Array, + Array, ArrayData, BooleanArray, FixedSizeBinaryArray, Int64Array, + StructArray, UInt32Array, UInt64Array, }, datatypes::{DataType, Field}, ffi::ArrowArray, @@ -149,4 +151,112 @@ mod tests { let data = array.data(); test_round_trip(data) } + + #[test] + fn test_fixed_size_binary() -> Result<()> { + let values = vec![vec![10, 10, 10], vec![20, 20, 20], vec![30, 30, 30]]; + let array = FixedSizeBinaryArray::try_from_iter(values.into_iter())?; + + let data = array.data(); + test_round_trip(data) + } + + #[test] + fn test_fixed_size_binary_with_nulls() -> Result<()> { + let values = vec![ + None, + Some(vec![10, 10, 10]), + None, + Some(vec![20, 20, 20]), + Some(vec![30, 30, 30]), + None, + ]; + let array = FixedSizeBinaryArray::try_from_sparse_iter(values.into_iter())?; + + let data = array.data(); + test_round_trip(data) + } + + #[test] + fn test_fixed_size_list() -> Result<()> { + let v: Vec = (0..9).into_iter().collect(); + let value_data = ArrayData::builder(DataType::Int64) + .len(9) + .add_buffer(Buffer::from_slice_ref(&v)) + .build()?; + let list_data_type = + DataType::FixedSizeList(Box::new(Field::new("f", DataType::Int64, false)), 3); + let list_data = ArrayData::builder(list_data_type) + .len(3) + .add_child_data(value_data) + .build()?; + let array = FixedSizeListArray::from(list_data); + + let data = array.data(); + test_round_trip(data) + } + + #[test] + fn test_fixed_size_list_with_nulls() -> Result<()> { + // 0100 0110 + let mut validity_bits: [u8; 1] = [0; 1]; + bit_util::set_bit(&mut validity_bits, 1); + bit_util::set_bit(&mut validity_bits, 2); + bit_util::set_bit(&mut validity_bits, 6); + + let v: Vec = (0..16).into_iter().collect(); + let value_data = ArrayData::builder(DataType::Int16) + .len(16) + .add_buffer(Buffer::from_slice_ref(&v)) + .build()?; + let list_data_type = + DataType::FixedSizeList(Box::new(Field::new("f", DataType::Int16, false)), 2); + let list_data = ArrayData::builder(list_data_type) + .len(8) + .null_bit_buffer(Buffer::from(validity_bits)) + .add_child_data(value_data) + .build()?; + let array = FixedSizeListArray::from(list_data); + + let data = array.data(); + test_round_trip(data) + } + + #[test] + fn test_fixed_size_list_nested() -> Result<()> { + let v: Vec = (0..16).into_iter().collect(); + let value_data = ArrayData::builder(DataType::Int32) + .len(16) + .add_buffer(Buffer::from_slice_ref(&v)) + .build()?; + + let offsets: Vec = vec![0, 2, 4, 6, 8, 10, 12, 14, 16]; + let value_offsets = Buffer::from_slice_ref(&offsets); + let inner_list_data_type = + DataType::List(Box::new(Field::new("item", DataType::Int32, false))); + let inner_list_data = ArrayData::builder(inner_list_data_type.clone()) + .len(8) + .add_buffer(value_offsets) + .add_child_data(value_data) + .build()?; + + // 0000 0100 + let mut validity_bits: [u8; 1] = [0; 1]; + bit_util::set_bit(&mut validity_bits, 2); + + let list_data_type = DataType::FixedSizeList( + Box::new(Field::new("f", inner_list_data_type, false)), + 2, + ); + let list_data = ArrayData::builder(list_data_type) + .len(4) + .null_bit_buffer(Buffer::from(validity_bits)) + .add_child_data(inner_list_data) + .build()?; + + let array = FixedSizeListArray::from(list_data); + + let data = array.data(); + test_round_trip(data) + } } diff --git a/arrow/src/datatypes/ffi.rs b/arrow/src/datatypes/ffi.rs index 10645fb68bb0..bc274e2dc3b8 100644 --- a/arrow/src/datatypes/ffi.rs +++ b/arrow/src/datatypes/ffi.rs @@ -67,6 +67,23 @@ impl TryFrom<&FFI_ArrowSchema> for DataType { // Parametrized types, requiring string parse other => { match other.splitn(2, ':').collect::>().as_slice() { + // FixedSizeBinary type in format "w:num_bytes" + ["w", num_bytes] => { + let parsed_num_bytes = num_bytes.parse::().map_err(|_| { + ArrowError::CDataInterface( + "FixedSizeBinary requires an integer parameter representing number of bytes per element".to_string()) + })?; + DataType::FixedSizeBinary(parsed_num_bytes) + }, + // FixedSizeList type in format "+w:num_elems" + ["+w", num_elems] => { + let c_child = c_schema.child(0); + let parsed_num_elems = num_elems.parse::().map_err(|_| { + ArrowError::CDataInterface( + "The FixedSizeList type requires an integer parameter representing number of elements per list".to_string()) + })?; + DataType::FixedSizeList(Box::new(Field::try_from(c_child)?), parsed_num_elems) + }, // Decimal types in format "d:precision,scale" or "d:precision,scale,bitWidth" ["d", extra] => { match extra.splitn(3, ',').collect::>().as_slice() { @@ -178,7 +195,9 @@ impl TryFrom<&DataType> for FFI_ArrowSchema { let format = get_format_string(dtype)?; // allocate and hold the children let children = match dtype { - DataType::List(child) | DataType::LargeList(child) => { + DataType::List(child) + | DataType::LargeList(child) + | DataType::FixedSizeList(child, _) => { vec![FFI_ArrowSchema::try_from(child.as_ref())?] } DataType::Struct(fields) => fields @@ -215,6 +234,8 @@ fn get_format_string(dtype: &DataType) -> Result { DataType::LargeBinary => Ok("Z".to_string()), DataType::Utf8 => Ok("u".to_string()), DataType::LargeUtf8 => Ok("U".to_string()), + DataType::FixedSizeBinary(num_bytes) => Ok(format!("w:{}", num_bytes)), + DataType::FixedSizeList(_, num_elems) => Ok(format!("+w:{}", num_elems)), DataType::Decimal(precision, scale) => Ok(format!("d:{},{}", precision, scale)), DataType::Date32 => Ok("tdD".to_string()), DataType::Date64 => Ok("tdm".to_string()), @@ -325,6 +346,11 @@ mod tests { round_trip_type(DataType::Float64)?; round_trip_type(DataType::Date64)?; round_trip_type(DataType::Time64(TimeUnit::Nanosecond))?; + round_trip_type(DataType::FixedSizeBinary(12))?; + round_trip_type(DataType::FixedSizeList( + Box::new(Field::new("a", DataType::Int64, false)), + 5, + ))?; round_trip_type(DataType::Utf8)?; round_trip_type(DataType::List(Box::new(Field::new( "a", diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index e507568014b6..ccd774ac4bec 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -333,6 +333,17 @@ fn bit_width(data_type: &DataType, i: usize) -> Result { data_type, i ))) } + (DataType::FixedSizeBinary(num_bytes), 1) => size_of::() * (*num_bytes as usize) * 8, + (DataType::FixedSizeList(f, num_elems), 1) => { + let child_bit_width = bit_width(f.data_type(), 1)?; + child_bit_width * (*num_elems as usize) + }, + (DataType::FixedSizeBinary(_), _) | (DataType::FixedSizeList(_, _), _) => { + return Err(ArrowError::CDataInterface(format!( + "The datatype \"{:?}\" expects 2 buffers, but requested {}. Please verify that the C data interface is correctly implemented.", + data_type, i + ))) + }, // Variable-sized binaries: have two buffers. // "small": first buffer is i32, second is in bytes (DataType::Utf8, 1) | (DataType::Binary, 1) | (DataType::List(_), 1) => size_of::() * 8, @@ -862,9 +873,10 @@ mod tests { use super::*; use crate::array::{ export_array_into_raw, make_array, Array, ArrayData, BinaryOffsetSizeTrait, - BooleanArray, DecimalArray, DictionaryArray, GenericBinaryArray, - GenericListArray, GenericStringArray, Int32Array, OffsetSizeTrait, - StringOffsetSizeTrait, Time32MillisecondArray, TimestampMillisecondArray, + BooleanArray, DecimalArray, DictionaryArray, FixedSizeBinaryArray, + FixedSizeListArray, GenericBinaryArray, GenericListArray, GenericStringArray, + Int32Array, OffsetSizeTrait, StringOffsetSizeTrait, Time32MillisecondArray, + TimestampMillisecondArray, }; use crate::compute::kernels; use crate::datatypes::{Field, Int8Type}; @@ -1175,6 +1187,117 @@ mod tests { Ok(()) } + #[test] + fn test_fixed_size_binary_array() -> Result<()> { + let values = vec![ + None, + Some(vec![10, 10, 10]), + None, + Some(vec![20, 20, 20]), + Some(vec![30, 30, 30]), + None, + ]; + let array = FixedSizeBinaryArray::try_from_sparse_iter(values.into_iter())?; + + // export it + let array = ArrowArray::try_from(array.data().clone())?; + + // (simulate consumer) import it + let data = ArrayData::try_from(array)?; + let array = make_array(data); + + // perform some operation + let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()]).unwrap(); + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + + // verify + assert_eq!( + array, + &FixedSizeBinaryArray::try_from_sparse_iter( + vec![ + None, + Some(vec![10, 10, 10]), + None, + Some(vec![20, 20, 20]), + Some(vec![30, 30, 30]), + None, + None, + Some(vec![10, 10, 10]), + None, + Some(vec![20, 20, 20]), + Some(vec![30, 30, 30]), + None, + ] + .into_iter() + )? + ); + + // (drop/release) + Ok(()) + } + + #[test] + fn test_fixed_size_list_array() -> Result<()> { + // 0000 0100 + let mut validity_bits: [u8; 1] = [0; 1]; + bit_util::set_bit(&mut validity_bits, 2); + + let v: Vec = (0..9).into_iter().collect(); + let value_data = ArrayData::builder(DataType::Int32) + .len(9) + .add_buffer(Buffer::from_slice_ref(&v)) + .build()?; + + let list_data_type = + DataType::FixedSizeList(Box::new(Field::new("f", DataType::Int32, false)), 3); + let list_data = ArrayData::builder(list_data_type.clone()) + .len(3) + .null_bit_buffer(Buffer::from(validity_bits)) + .add_child_data(value_data) + .build()?; + + // export it + let array = ArrowArray::try_from(list_data)?; + + // (simulate consumer) import it + let data = ArrayData::try_from(array)?; + let array = make_array(data); + + // perform some operation + let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()]).unwrap(); + let array = array.as_any().downcast_ref::().unwrap(); + + // 0010 0100 + let mut expected_validity_bits: [u8; 1] = [0; 1]; + bit_util::set_bit(&mut expected_validity_bits, 2); + bit_util::set_bit(&mut expected_validity_bits, 5); + + let mut w = vec![]; + w.extend_from_slice(&v); + w.extend_from_slice(&v); + + let expected_value_data = ArrayData::builder(DataType::Int32) + .len(18) + .add_buffer(Buffer::from_slice_ref(&w)) + .build()?; + + let expected_list_data = ArrayData::builder(list_data_type) + .len(6) + .null_bit_buffer(Buffer::from(expected_validity_bits)) + .add_child_data(expected_value_data) + .build()?; + let expected_array = FixedSizeListArray::from(expected_list_data); + + // verify + assert_eq!(array, &expected_array); + + // (drop/release) + Ok(()) + } + #[test] fn test_dictionary() -> Result<()> { // create an array natively