Skip to content

Commit

Permalink
feat: Implement compression and skipping for binview IPC (#14789)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Feb 29, 2024
1 parent 4967150 commit 2d3b3b2
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 25 deletions.
31 changes: 31 additions & 0 deletions crates/polars-arrow/src/io/ipc/read/array/binview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,34 @@ pub fn read_binview<T: ViewType + ?Sized, R: Read + Seek>(
BinaryViewArrayGeneric::<T>::try_new(data_type, views, Arc::from(variadic_buffers), validity)
.map(|arr| arr.boxed())
}

pub fn skip_binview(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(
oos = "IPC: unable to fetch the field for utf8. The file or stream is corrupted."
)
})?;

let _ = buffers
.pop_front()
.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;

let _ = buffers
.pop_front()
.ok_or_else(|| polars_err!(oos = "IPC: missing views buffer."))?;

let n_variadic = variadic_buffer_counts.pop_front().ok_or_else(
|| polars_err!(ComputeError: "IPC: unable to fetch the variadic buffers\n\nThe file or stream is corrupted.")
)?;

for _ in 0..n_variadic {
let _ = buffers
.pop_front()
.ok_or_else(|| polars_err!(oos = "IPC: missing variadic buffer"))?;
}
Ok(())
}
8 changes: 7 additions & 1 deletion crates/polars-arrow/src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub fn skip_fixed_size_list(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(oos =
Expand All @@ -79,5 +80,10 @@ pub fn skip_fixed_size_list(

let (field, _) = FixedSizeListArray::get_child_and_size(data_type);

skip(field_nodes, field.data_type(), buffers)
skip(
field_nodes,
field.data_type(),
buffers,
variadic_buffer_counts,
)
}
3 changes: 2 additions & 1 deletion crates/polars-arrow/src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub fn skip_list<O: Offset>(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(
Expand All @@ -101,5 +102,5 @@ pub fn skip_list<O: Offset>(

let data_type = ListArray::<O>::get_child_type(data_type);

skip(field_nodes, data_type, buffers)
skip(field_nodes, data_type, buffers, variadic_buffer_counts)
}
3 changes: 2 additions & 1 deletion crates/polars-arrow/src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub fn skip_map(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(
Expand All @@ -97,5 +98,5 @@ pub fn skip_map(

let data_type = MapArray::get_field(data_type).data_type();

skip(field_nodes, data_type, buffers)
skip(field_nodes, data_type, buffers, variadic_buffer_counts)
}
12 changes: 9 additions & 3 deletions crates/polars-arrow/src/io/ipc/read/array/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub fn skip_struct(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(
Expand All @@ -84,7 +85,12 @@ pub fn skip_struct(

let fields = StructArray::get_fields(data_type);

fields
.iter()
.try_for_each(|field| skip(field_nodes, field.data_type(), buffers))
fields.iter().try_for_each(|field| {
skip(
field_nodes,
field.data_type(),
buffers,
variadic_buffer_counts,
)
})
}
12 changes: 9 additions & 3 deletions crates/polars-arrow/src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub fn skip_union(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(
Expand All @@ -117,7 +118,12 @@ pub fn skip_union(

let fields = UnionArray::get_fields(data_type);

fields
.iter()
.try_for_each(|field| skip(field_nodes, field.data_type(), buffers))
fields.iter().try_for_each(|field| {
skip(
field_nodes,
field.data_type(),
buffers,
variadic_buffer_counts,
)
})
}
7 changes: 6 additions & 1 deletion crates/polars-arrow/src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ pub fn read_record_batch<R: Read + Seek>(
scratch,
)?)),
ProjectionResult::NotSelected((field, _)) => {
skip(&mut field_nodes, &field.data_type, &mut buffers)?;
skip(
&mut field_nodes,
&field.data_type,
&mut buffers,
&mut variadic_buffer_counts,
)?;
Ok(None)
},
})
Expand Down
17 changes: 10 additions & 7 deletions crates/polars-arrow/src/io/ipc/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ pub fn skip(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
use PhysicalType::*;
match data_type.to_physical_type() {
Expand All @@ -272,13 +273,15 @@ pub fn skip(
LargeBinary | Binary => skip_binary(field_nodes, buffers),
LargeUtf8 | Utf8 => skip_utf8(field_nodes, buffers),
FixedSizeBinary => skip_fixed_size_binary(field_nodes, buffers),
List => skip_list::<i32>(field_nodes, data_type, buffers),
LargeList => skip_list::<i64>(field_nodes, data_type, buffers),
FixedSizeList => skip_fixed_size_list(field_nodes, data_type, buffers),
Struct => skip_struct(field_nodes, data_type, buffers),
List => skip_list::<i32>(field_nodes, data_type, buffers, variadic_buffer_counts),
LargeList => skip_list::<i64>(field_nodes, data_type, buffers, variadic_buffer_counts),
FixedSizeList => {
skip_fixed_size_list(field_nodes, data_type, buffers, variadic_buffer_counts)
},
Struct => skip_struct(field_nodes, data_type, buffers, variadic_buffer_counts),
Dictionary(_) => skip_dictionary(field_nodes, buffers),
Union => skip_union(field_nodes, data_type, buffers),
Map => skip_map(field_nodes, data_type, buffers),
BinaryView | Utf8View => todo!(),
Union => skip_union(field_nodes, data_type, buffers, variadic_buffer_counts),
Map => skip_map(field_nodes, data_type, buffers, variadic_buffer_counts),
BinaryView | Utf8View => skip_binview(field_nodes, buffers, variadic_buffer_counts),
}
}
19 changes: 11 additions & 8 deletions crates/polars-arrow/src/io/ipc/read/read_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ fn read_uncompressed_buffer<T: NativeType, R: Read + Seek>(
fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
reader: &mut R,
buffer_length: usize,
length: usize,
output_length: Option<usize>,
is_little_endian: bool,
compression: Compression,
scratch: &mut Vec<u8>,
) -> PolarsResult<Vec<T>> {
if length == 0 {
if output_length == Some(0) {
return Ok(vec![]);
}

Expand All @@ -111,10 +111,6 @@ fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
)
}

// It is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
let mut buffer = vec![T::default(); length];

// decompress first
scratch.clear();
scratch.try_reserve(buffer_length)?;
Expand All @@ -123,6 +119,13 @@ fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
.take(buffer_length as u64)
.read_to_end(scratch)?;

let length = output_length
.unwrap_or_else(|| i64::from_le_bytes(scratch[..8].try_into().unwrap()) as usize);

// It is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
let mut buffer = vec![T::default(); length];

let out_slice = bytemuck::cast_slice_mut(&mut buffer);

let compression = compression
Expand Down Expand Up @@ -150,7 +153,7 @@ fn read_compressed_bytes<R: Read + Seek>(
read_compressed_buffer::<u8, _>(
reader,
buffer_length,
buffer_length,
None,
is_little_endian,
compression,
scratch,
Expand Down Expand Up @@ -224,7 +227,7 @@ pub fn read_buffer<T: NativeType, R: Read + Seek>(
Ok(read_compressed_buffer(
reader,
buffer_length,
length,
Some(length),
is_little_endian,
compression,
scratch,
Expand Down
23 changes: 23 additions & 0 deletions crates/polars/tests/it/io/ipc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::io::{Seek, SeekFrom};

use polars::prelude::*;

#[test]
fn test_ipc_compression_variadic_buffers() {
let mut df = df![
"foo" => std::iter::repeat("Home delivery vat 24 %").take(3).collect::<Vec<_>>()
]
.unwrap();

let mut file = std::io::Cursor::new(vec![]);
IpcWriter::new(&mut file)
.with_compression(Some(IpcCompression::LZ4))
.with_pl_flavor(true)
.finish(&mut df)
.unwrap();

file.seek(SeekFrom::Start(0)).unwrap();
let out = IpcReader::new(file).finish().unwrap();

assert_eq!(out.shape(), (3, 1));
}
2 changes: 2 additions & 0 deletions crates/polars/tests/it/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ mod parquet;
#[cfg(feature = "avro")]
mod avro;

#[cfg(feature = "ipc")]
mod ipc;
#[cfg(feature = "ipc_streaming")]
mod ipc_stream;

Expand Down

0 comments on commit 2d3b3b2

Please sign in to comment.