From 60ec869779ebcf44a4f757aff6eaa22a7fd5e4ed Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 12 Sep 2024 09:26:57 -0400 Subject: [PATCH] Support StringViewArray interop with python: fix lingering C Data Interface issues for *ViewArray (#6368) * fix lingering C Data Interface issues for *ViewArray Fixes https://github.com/apache/arrow-rs/issues/6366 * report views length in elements -> bytes * use pyarrow 17 * use only good versions * fix support for View arrays in C FFI, add test * update comment in github action * more ffi test cases * more byte_view tests for into_pyarrow --- .github/workflows/integration.yml | 7 +- arrow-array/src/ffi.rs | 168 ++++++++++++++++++++++++--- arrow-buffer/src/buffer/immutable.rs | 7 +- arrow-data/src/ffi.rs | 20 +++- arrow/src/pyarrow.rs | 2 +- arrow/tests/pyarrow.rs | 69 ++++++++++- 6 files changed, 247 insertions(+), 26 deletions(-) diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 1937fafe3a62..41edc1bb194e 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -48,7 +48,6 @@ on: - arrow/** jobs: - integration: name: Archery test With other arrows runs-on: ubuntu-latest @@ -118,9 +117,9 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - rust: [ stable ] - # PyArrow 13 was the last version prior to introduction to Arrow PyCapsules - pyarrow: [ "13", "14" ] + rust: [stable] + # PyArrow 15 was the first version to introduce StringView/BinaryView support + pyarrow: ["15", "16", "17"] steps: - uses: actions/checkout@v4 with: diff --git a/arrow-array/src/ffi.rs b/arrow-array/src/ffi.rs index 1d76ed62d365..a28b3f746115 100644 --- a/arrow-array/src/ffi.rs +++ b/arrow-array/src/ffi.rs @@ -193,6 +193,13 @@ fn bit_width(data_type: &DataType, i: usize) -> Result { "The datatype \"{data_type:?}\" expects 3 buffers, but requested {i}. Please verify that the C data interface is correctly implemented." ))) } + // Variable-sized views: have 3 or more buffers. + // Buffer 1 are the u128 views + // Buffers 2...N-1 are u8 byte buffers + (DataType::Utf8View, 1) | (DataType::BinaryView,1) => u128::BITS as _, + (DataType::Utf8View, _) | (DataType::BinaryView, _) => { + u8::BITS as _ + } // type ids. UnionArray doesn't have null bitmap so buffer index begins with 0. (DataType::Union(_, _), 0) => i8::BITS as _, // Only DenseUnion has 2nd buffer @@ -300,7 +307,7 @@ impl<'a> ImportedArrowArray<'a> { }; let data_layout = layout(&self.data_type); - let buffers = self.buffers(data_layout.can_contain_null_mask)?; + let buffers = self.buffers(data_layout.can_contain_null_mask, data_layout.variadic)?; let null_bit_buffer = if data_layout.can_contain_null_mask { self.null_bit_buffer() @@ -373,13 +380,30 @@ impl<'a> ImportedArrowArray<'a> { /// returns all buffers, as organized by Rust (i.e. null buffer is skipped if it's present /// in the spec of the type) - fn buffers(&self, can_contain_null_mask: bool) -> Result> { + fn buffers(&self, can_contain_null_mask: bool, variadic: bool) -> Result> { // + 1: skip null buffer let buffer_begin = can_contain_null_mask as usize; - (buffer_begin..self.array.num_buffers()) - .map(|index| { - let len = self.buffer_len(index, &self.data_type)?; + let buffer_end = self.array.num_buffers() - usize::from(variadic); + + let variadic_buffer_lens = if variadic { + // Each views array has 1 (optional) null buffer, 1 views buffer, 1 lengths buffer. + // Rest are variadic. + let num_variadic_buffers = + self.array.num_buffers() - (2 + usize::from(can_contain_null_mask)); + if num_variadic_buffers == 0 { + &[] + } else { + let lengths = self.array.buffer(self.array.num_buffers() - 1); + // SAFETY: is lengths is non-null, then it must be valid for up to num_variadic_buffers. + unsafe { std::slice::from_raw_parts(lengths.cast::(), num_variadic_buffers) } + } + } else { + &[] + }; + (buffer_begin..buffer_end) + .map(|index| { + let len = self.buffer_len(index, variadic_buffer_lens, &self.data_type)?; match unsafe { create_buffer(self.owner.clone(), self.array, index, len) } { Some(buf) => Ok(buf), None if len == 0 => { @@ -399,7 +423,12 @@ impl<'a> ImportedArrowArray<'a> { /// Rust implementation uses fixed-sized buffers, which require knowledge of their `len`. /// for variable-sized buffers, such as the second buffer of a stringArray, we need /// to fetch offset buffer's len to build the second buffer. - fn buffer_len(&self, i: usize, dt: &DataType) -> Result { + fn buffer_len( + &self, + i: usize, + variadic_buffer_lengths: &[i64], + dt: &DataType, + ) -> Result { // Special handling for dictionary type as we only care about the key type in the case. let data_type = match dt { DataType::Dictionary(key_data_type, _) => key_data_type.as_ref(), @@ -430,7 +459,7 @@ impl<'a> ImportedArrowArray<'a> { } // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) - let len = self.buffer_len(1, dt)?; + let len = self.buffer_len(1, variadic_buffer_lengths, dt)?; // first buffer is the null buffer => add(1) // we assume that pointer is aligned for `i32`, as Utf8 uses `i32` offsets. #[allow(clippy::cast_ptr_alignment)] @@ -444,7 +473,7 @@ impl<'a> ImportedArrowArray<'a> { } // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) - let len = self.buffer_len(1, dt)?; + let len = self.buffer_len(1, variadic_buffer_lengths, dt)?; // first buffer is the null buffer => add(1) // we assume that pointer is aligned for `i64`, as Large uses `i64` offsets. #[allow(clippy::cast_ptr_alignment)] @@ -452,6 +481,16 @@ impl<'a> ImportedArrowArray<'a> { // get last offset (unsafe { *offset_buffer.add(len / size_of::() - 1) }) as usize } + // View types: these have variadic buffers. + // Buffer 1 is the views buffer, which stores 1 u128 per length of the array. + // Buffers 2..N-1 are the buffers holding the byte data. Their lengths are variable. + // Buffer N is of length (N - 2) and stores i64 containing the lengths of buffers 2..N-1 + (DataType::Utf8View, 1) | (DataType::BinaryView, 1) => { + std::mem::size_of::() * length + } + (DataType::Utf8View, i) | (DataType::BinaryView, i) => { + variadic_buffer_lengths[i - 2] as usize + } // buffer len of primitive types _ => { let bits = bit_width(data_type, i)?; @@ -1229,18 +1268,18 @@ mod tests_from_ffi { use arrow_data::ArrayData; use arrow_schema::{DataType, Field}; - use crate::types::Int32Type; + use super::{ImportedArrowArray, Result}; + use crate::builder::GenericByteViewBuilder; + use crate::types::{BinaryViewType, ByteViewType, Int32Type, StringViewType}; use crate::{ array::{ Array, BooleanArray, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray, Int32Array, Int64Array, StringArray, StructArray, UInt32Array, UInt64Array, }, ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema}, - make_array, ArrayRef, ListArray, + make_array, ArrayRef, GenericByteViewArray, ListArray, }; - use super::{ImportedArrowArray, Result}; - fn test_round_trip(expected: &ArrayData) -> Result<()> { // here we export the array let array = FFI_ArrowArray::new(expected); @@ -1453,8 +1492,8 @@ mod tests_from_ffi { owner: &array, }; - let offset_buf_len = imported_array.buffer_len(1, &imported_array.data_type)?; - let data_buf_len = imported_array.buffer_len(2, &imported_array.data_type)?; + let offset_buf_len = imported_array.buffer_len(1, &[], &imported_array.data_type)?; + let data_buf_len = imported_array.buffer_len(2, &[], &imported_array.data_type)?; assert_eq!(offset_buf_len, 4); assert_eq!(data_buf_len, 0); @@ -1472,6 +1511,18 @@ mod tests_from_ffi { StringArray::from(array) } + fn roundtrip_byte_view_array( + array: GenericByteViewArray, + ) -> GenericByteViewArray { + let data = array.into_data(); + + let array = FFI_ArrowArray::new(&data); + let schema = FFI_ArrowSchema::try_from(data.data_type()).unwrap(); + + let array = unsafe { from_ffi(array, &schema) }.unwrap(); + GenericByteViewArray::::from(array) + } + fn extend_array(array: &dyn Array) -> ArrayRef { let len = array.len(); let data = array.to_data(); @@ -1551,4 +1602,93 @@ mod tests_from_ffi { &imported ); } + + /// Helper trait to allow us to use easily strings as either BinaryViewType::Native or + /// StringViewType::Native scalars. + trait NativeFromStr { + fn from_str(value: &str) -> &Self; + } + + impl NativeFromStr for str { + fn from_str(value: &str) -> &Self { + value + } + } + + impl NativeFromStr for [u8] { + fn from_str(value: &str) -> &Self { + value.as_bytes() + } + } + + #[test] + fn test_round_trip_byte_view() { + fn test_case() + where + T: ByteViewType, + T::Native: NativeFromStr, + { + macro_rules! run_test_case { + ($array:expr) => {{ + // round-trip through C Data Interface + let len = $array.len(); + let imported = roundtrip_byte_view_array($array); + assert_eq!(imported.len(), len); + + let copied = extend_array(&imported); + assert_eq!( + copied + .as_any() + .downcast_ref::>() + .unwrap(), + &imported + ); + }}; + } + + // Empty test case. + let empty = GenericByteViewBuilder::::new().finish(); + run_test_case!(empty); + + // All inlined strings test case. + let mut all_inlined = GenericByteViewBuilder::::new(); + all_inlined.append_value(T::Native::from_str("inlined1")); + all_inlined.append_value(T::Native::from_str("inlined2")); + all_inlined.append_value(T::Native::from_str("inlined3")); + let all_inlined = all_inlined.finish(); + assert_eq!(all_inlined.data_buffers().len(), 0); + run_test_case!(all_inlined); + + // some inlined + non-inlined, 1 variadic buffer. + let mixed_one_variadic = { + let mut builder = GenericByteViewBuilder::::new(); + builder.append_value(T::Native::from_str("inlined")); + let block_id = + builder.append_block(Buffer::from("non-inlined-string-buffer".as_bytes())); + builder.try_append_view(block_id, 0, 25).unwrap(); + builder.finish() + }; + assert_eq!(mixed_one_variadic.data_buffers().len(), 1); + run_test_case!(mixed_one_variadic); + + // inlined + non-inlined, 2 variadic buffers. + let mixed_two_variadic = { + let mut builder = GenericByteViewBuilder::::new(); + builder.append_value(T::Native::from_str("inlined")); + let block_id = + builder.append_block(Buffer::from("non-inlined-string-buffer".as_bytes())); + builder.try_append_view(block_id, 0, 25).unwrap(); + + let block_id = builder + .append_block(Buffer::from("another-non-inlined-string-buffer".as_bytes())); + builder.try_append_view(block_id, 0, 33).unwrap(); + builder.finish() + }; + assert_eq!(mixed_two_variadic.data_buffers().len(), 2); + run_test_case!(mixed_two_variadic); + } + + test_case::(); + test_case::(); + } } diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index 7cd3552215f8..fef2f8008b2a 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -203,7 +203,9 @@ impl Buffer { pub fn advance(&mut self, offset: usize) { assert!( offset <= self.length, - "the offset of the new Buffer cannot exceed the existing length" + "the offset of the new Buffer cannot exceed the existing length: offset={} length={}", + offset, + self.length ); self.length -= offset; // Safety: @@ -221,7 +223,8 @@ impl Buffer { pub fn slice_with_length(&self, offset: usize, length: usize) -> Self { assert!( offset.saturating_add(length) <= self.length, - "the offset of the new Buffer cannot exceed the existing length" + "the offset of the new Buffer cannot exceed the existing length: slice offset={offset} length={length} selflen={}", + self.length ); // Safety: // offset + length <= self.length diff --git a/arrow-data/src/ffi.rs b/arrow-data/src/ffi.rs index 3345595fac19..cd283d32662f 100644 --- a/arrow-data/src/ffi.rs +++ b/arrow-data/src/ffi.rs @@ -20,7 +20,7 @@ use crate::bit_mask::set_bits; use crate::{layout, ArrayData}; use arrow_buffer::buffer::NullBuffer; -use arrow_buffer::{Buffer, MutableBuffer}; +use arrow_buffer::{Buffer, MutableBuffer, ScalarBuffer}; use arrow_schema::DataType; use std::ffi::c_void; @@ -121,7 +121,7 @@ impl FFI_ArrowArray { pub fn new(data: &ArrayData) -> Self { let data_layout = layout(data.data_type()); - let buffers = if data_layout.can_contain_null_mask { + let mut buffers = if data_layout.can_contain_null_mask { // * insert the null buffer at the start // * make all others `Option`. std::iter::once(align_nulls(data.offset(), data.nulls())) @@ -132,7 +132,7 @@ impl FFI_ArrowArray { }; // `n_buffers` is the number of buffers by the spec. - let n_buffers = { + let mut n_buffers = { data_layout.buffers.len() + { // If the layout has a null buffer by Arrow spec. // Note that even the array doesn't have a null buffer because it has @@ -141,10 +141,22 @@ impl FFI_ArrowArray { } } as i64; + if data_layout.variadic { + // Save the lengths of all variadic buffers into a new buffer. + // The first buffer is `views`, and the rest are variadic. + let mut data_buffers_lengths = Vec::new(); + for buffer in data.buffers().iter().skip(1) { + data_buffers_lengths.push(buffer.len() as i64); + n_buffers += 1; + } + + buffers.push(Some(ScalarBuffer::from(data_buffers_lengths).into_inner())); + n_buffers += 1; + } + let buffers_ptr = buffers .iter() .flat_map(|maybe_buffer| match maybe_buffer { - // note that `raw_data` takes into account the buffer's offset Some(b) => Some(b.as_ptr() as *const c_void), // This is for null buffer. We only put a null pointer for // null buffer if by spec it can contain null mask. diff --git a/arrow/src/pyarrow.rs b/arrow/src/pyarrow.rs index 336398cbf22f..a7b593799835 100644 --- a/arrow/src/pyarrow.rs +++ b/arrow/src/pyarrow.rs @@ -354,7 +354,7 @@ impl FromPyArrow for RecordBatch { validate_pycapsule(array_capsule, "arrow_array")?; let schema_ptr = unsafe { schema_capsule.reference::() }; - let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) }; + let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) }; let array_data = unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?; if !matches!(array_data.data_type(), DataType::Struct(_)) { return Err(PyTypeError::new_err( diff --git a/arrow/tests/pyarrow.rs b/arrow/tests/pyarrow.rs index a1c365c31798..d9ebd0daa1cd 100644 --- a/arrow/tests/pyarrow.rs +++ b/arrow/tests/pyarrow.rs @@ -18,6 +18,8 @@ use arrow::array::{ArrayRef, Int32Array, StringArray}; use arrow::pyarrow::{FromPyArrow, ToPyArrow}; use arrow::record_batch::RecordBatch; +use arrow_array::builder::{BinaryViewBuilder, StringViewBuilder}; +use arrow_array::{Array, BinaryViewArray, StringViewArray}; use pyo3::Python; use std::sync::Arc; @@ -27,7 +29,9 @@ fn test_to_pyarrow() { let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"])); - let input = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); + // The "very long string" will not be inlined, and force the creation of a data buffer. + let c: ArrayRef = Arc::new(StringViewArray::from(vec!["short", "a very long string"])); + let input = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); println!("input: {:?}", input); let res = Python::with_gil(|py| { @@ -40,3 +44,66 @@ fn test_to_pyarrow() { assert_eq!(input, res); } + +#[test] +fn test_to_pyarrow_byte_view() { + pyo3::prepare_freethreaded_python(); + + for num_variadic_buffers in 0..=2 { + let string_view: ArrayRef = Arc::new(string_view_column(num_variadic_buffers)); + let binary_view: ArrayRef = Arc::new(binary_view_column(num_variadic_buffers)); + + let input = RecordBatch::try_from_iter(vec![ + ("string_view", string_view), + ("binary_view", binary_view), + ]) + .unwrap(); + + println!("input: {:?}", input); + let res = Python::with_gil(|py| { + let py_input = input.to_pyarrow(py)?; + let records = RecordBatch::from_pyarrow_bound(py_input.bind(py))?; + let py_records = records.to_pyarrow(py)?; + RecordBatch::from_pyarrow_bound(py_records.bind(py)) + }) + .unwrap(); + + assert_eq!(input, res); + } +} + +fn binary_view_column(num_variadic_buffers: usize) -> BinaryViewArray { + let long_scalar = b"but soft what light through yonder window breaks".as_slice(); + let mut builder = BinaryViewBuilder::new().with_fixed_block_size(long_scalar.len() as u32); + // Make sure there is at least one non-inlined value. + builder.append_value("inlined".as_bytes()); + + for _ in 0..num_variadic_buffers { + builder.append_value(long_scalar); + } + + let result = builder.finish(); + + assert_eq!(result.data_buffers().len(), num_variadic_buffers); + assert_eq!(result.len(), num_variadic_buffers + 1); + + result +} + +fn string_view_column(num_variadic_buffers: usize) -> StringViewArray { + let long_scalar = "but soft what light through yonder window breaks"; + let mut builder = StringViewBuilder::new().with_fixed_block_size(long_scalar.len() as u32); + // Make sure there is at least one non-inlined value. + builder.append_value("inlined"); + + for _ in 0..num_variadic_buffers { + builder.append_value(long_scalar); + } + + let result = builder.finish(); + + assert_eq!(result.data_buffers().len(), num_variadic_buffers); + assert_eq!(result.len(), num_variadic_buffers + 1); + + result +}