Skip to content

Commit

Permalink
fix: Treat explode as gather (#18431)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Aug 28, 2024
1 parent 37a492e commit 79cffee
Show file tree
Hide file tree
Showing 19 changed files with 106 additions and 305 deletions.
203 changes: 8 additions & 195 deletions crates/polars-core/src/chunked_array/ops/explode.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
use arrow::array::*;
use arrow::bitmap::utils::set_bit_unchecked;
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::legacy::array::list::AnonymousBuilder;
#[cfg(feature = "dtype-array")]
use arrow::legacy::is_valid::IsValid;
use arrow::legacy::prelude::*;
use arrow::legacy::trusted_len::TrustedLenPush;
use polars_utils::slice::GetSaferUnchecked;

#[cfg(feature = "dtype-array")]
use crate::chunked_array::builder::get_fixed_size_list_builder;
use crate::chunked_array::metadata::MetadataProperties;
use crate::prelude::*;
use crate::series::implementations::null::NullChunked;

Expand Down Expand Up @@ -160,12 +153,18 @@ where

impl ExplodeByOffsets for Float32Chunked {
fn explode_by_offsets(&self, offsets: &[i64]) -> Series {
self.apply_as_ints(|s| s.explode_by_offsets(offsets))
self.apply_as_ints(|s| {
let ca = s.u32().unwrap();
ca.explode_by_offsets(offsets)
})
}
}
impl ExplodeByOffsets for Float64Chunked {
fn explode_by_offsets(&self, offsets: &[i64]) -> Series {
self.apply_as_ints(|s| s.explode_by_offsets(offsets))
self.apply_as_ints(|s| {
let ca = s.u64().unwrap();
ca.explode_by_offsets(offsets)
})
}
}

Expand Down Expand Up @@ -225,166 +224,6 @@ impl ExplodeByOffsets for BooleanChunked {
}
}

impl ExplodeByOffsets for ListChunked {
fn explode_by_offsets(&self, offsets: &[i64]) -> Series {
debug_assert_eq!(self.chunks.len(), 1);
let arr = self.downcast_iter().next().unwrap();

let cap = get_capacity(offsets);
let inner_type = self.inner_dtype();

let mut builder = arrow::legacy::array::list::AnonymousBuilder::new(cap);
let mut owned = Vec::with_capacity(cap);
let mut start = offsets[0] as usize;
let mut last = start;

let mut process_range = |start: usize, last: usize, builder: &mut AnonymousBuilder<'_>| {
let vals = arr.slice_typed(start, last - start);
for opt_arr in vals.into_iter() {
match opt_arr {
None => builder.push_null(),
Some(arr) => {
unsafe {
// we create a pointer to evade the bck
let ptr = arr.as_ref() as *const dyn Array;
// SAFETY: we preallocated
owned.push_unchecked(arr);
// SAFETY: the pointer is still valid as `owned` will not reallocate
builder.push(&*ptr as &dyn Array);
}
},
}
}
};

for &o in &offsets[1..] {
let o = o as usize;
if o == last {
if start != last {
process_range(start, last, &mut builder);
}
builder.push_null();
start = o;
}
last = o;
}
process_range(start, last, &mut builder);
let arr = builder
.finish(Some(&inner_type.to_arrow(CompatLevel::newest())))
.unwrap();
let mut ca = unsafe { self.copy_with_chunks(vec![Box::new(arr)]) };

use MetadataProperties as P;
ca.copy_metadata(self, P::SORTED | P::FAST_EXPLODE_LIST);

ca.into_series()
}
}

#[cfg(feature = "dtype-array")]
impl ExplodeByOffsets for ArrayChunked {
fn explode_by_offsets(&self, offsets: &[i64]) -> Series {
debug_assert_eq!(self.chunks.len(), 1);
let arr = self.downcast_iter().next().unwrap();

let cap = get_capacity(offsets);
let inner_type = self.inner_dtype();
let mut builder =
get_fixed_size_list_builder(inner_type, cap, self.width(), self.name()).unwrap();

let mut start = offsets[0] as usize;
let mut last = start;
for &o in &offsets[1..] {
let o = o as usize;
if o == last {
if start != last {
let array = arr.slice_typed(start, last - start);
let values = array.values().as_ref();

for i in 0..array.len() {
unsafe {
if array.is_valid_unchecked(i) {
builder.push_unchecked(values, i)
} else {
builder.push_null()
}
}
}
}
unsafe {
builder.push_null();
}
start = o;
}
last = o;
}
let array = arr.slice_typed(start, last - start);
let values = array.values().as_ref();
for i in 0..array.len() {
unsafe {
if array.is_valid_unchecked(i) {
builder.push_unchecked(values, i)
} else {
builder.push_null()
}
}
}

builder.finish().into()
}
}

impl ExplodeByOffsets for StringChunked {
fn explode_by_offsets(&self, offsets: &[i64]) -> Series {
unsafe {
self.as_binary()
.explode_by_offsets(offsets)
.cast_unchecked(&DataType::String)
.unwrap()
}
}
}

impl ExplodeByOffsets for BinaryChunked {
fn explode_by_offsets(&self, offsets: &[i64]) -> Series {
debug_assert_eq!(self.chunks.len(), 1);
let arr = self.downcast_iter().next().unwrap();

let cap = get_capacity(offsets);
let mut builder = BinaryChunkedBuilder::new(self.name(), cap);

let mut start = offsets[0] as usize;
let mut last = start;
for &o in &offsets[1..] {
let o = o as usize;
if o == last {
if start != last {
let vals = arr.slice_typed(start, last - start);
if vals.null_count() == 0 {
builder
.chunk_builder
.extend_trusted_len_values(vals.values_iter())
} else {
builder.chunk_builder.extend_trusted_len(vals.into_iter());
}
}
builder.append_null();
start = o;
}
last = o;
}
let vals = arr.slice_typed(start, last - start);
if vals.null_count() == 0 {
builder
.chunk_builder
.extend_trusted_len_values(vals.values_iter())
} else {
builder.chunk_builder.extend_trusted_len(vals.into_iter());
}
builder.finish().into()
}
}

/// Convert Arrow array offsets to indexes of the original list
pub(crate) fn offsets_to_indexes(offsets: &[i64], capacity: usize) -> Vec<IdxSize> {
if offsets.is_empty() {
Expand Down Expand Up @@ -454,32 +293,6 @@ mod test {
Ok(())
}

#[test]
fn test_explode_list_nulls() -> PolarsResult<()> {
let ca = Int32Chunked::from_slice_options("", &[None, Some(1), Some(2)]);
let offsets = &[0, 3, 3];
let out = ca.explode_by_offsets(offsets);
assert_eq!(
Vec::from(out.i32().unwrap()),
&[None, Some(1), Some(2), None]
);

let ca = BooleanChunked::from_slice_options("", &[None, Some(true), Some(false)]);
let out = ca.explode_by_offsets(offsets);
assert_eq!(
Vec::from(out.bool().unwrap()),
&[None, Some(true), Some(false), None]
);

let ca = StringChunked::from_slice_options("", &[None, Some("b"), Some("c")]);
let out = ca.explode_by_offsets(offsets);
assert_eq!(
Vec::from(out.str().unwrap()),
&[None, Some("b"), Some("c"), None]
);
Ok(())
}

#[test]
fn test_explode_empty_list_slot() -> PolarsResult<()> {
// primitive
Expand Down
115 changes: 91 additions & 24 deletions crates/polars-core/src/chunked_array/ops/explode_and_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,51 @@ use arrow::offset::OffsetsBuffer;

use super::*;

impl ListChunked {
fn specialized(
&self,
values: ArrayRef,
offsets: &[i64],
offsets_buf: OffsetsBuffer<i64>,
) -> (Series, OffsetsBuffer<i64>) {
// SAFETY: inner_dtype should be correct
let values = unsafe {
Series::from_chunks_and_dtype_unchecked(
self.name(),
vec![values],
&self.inner_dtype().to_physical(),
)
};

use crate::chunked_array::ops::explode::ExplodeByOffsets;

let mut values = match values.dtype() {
DataType::Boolean => {
let t = values.bool().unwrap();
ExplodeByOffsets::explode_by_offsets(t, offsets).into_series()
},
DataType::Null => {
let t = values.null().unwrap();
ExplodeByOffsets::explode_by_offsets(t, offsets).into_series()
},
dtype => {
with_match_physical_numeric_polars_type!(dtype, |$T| {
let t: &ChunkedArray<$T> = values.as_ref().as_ref();
ExplodeByOffsets::explode_by_offsets(t, offsets).into_series()
})
},
};

// let mut values = values.explode_by_offsets(offsets);
// restore logical type
unsafe {
values = values.cast_unchecked(self.inner_dtype()).unwrap();
}

(values, offsets_buf)
}
}

impl ChunkExplode for ListChunked {
fn offsets(&self) -> PolarsResult<OffsetsBuffer<i64>> {
let ca = self.rechunk();
Expand Down Expand Up @@ -64,16 +109,36 @@ impl ChunkExplode for ListChunked {
panic!("could have fast exploded")
}
}
if listarr.null_count() == 0 {
// SAFETY: inner_dtype should be correct
let values = unsafe {
Series::from_chunks_and_dtype_unchecked(
self.name(),
vec![values],
&self.inner_dtype().to_physical(),
)
};
(values.explode_by_offsets(offsets), offsets_buf)
let (indices, new_offsets) = if listarr.null_count() == 0 {
// SPECIALIZED path.
let inner_phys = self.inner_dtype().to_physical();
if inner_phys.is_numeric() || inner_phys.is_null() || inner_phys.is_bool() {
return Ok(self.specialized(values, offsets, offsets_buf));
}
// Use gather
let mut indices =
MutablePrimitiveArray::<IdxSize>::with_capacity(*offsets_buf.last() as usize);
let mut new_offsets = Vec::with_capacity(listarr.len() + 1);
let mut current_offset = 0i64;
let mut iter = offsets.iter();
if let Some(mut previous) = iter.next().copied() {
new_offsets.push(current_offset);
iter.for_each(|&offset| {
let len = offset - previous;
let start = previous as IdxSize;
let end = offset as IdxSize;

if len == 0 {
indices.push_null();
} else {
indices.extend_trusted_len_values(start..end);
}
current_offset += len;
previous = offset;
new_offsets.push(current_offset);
})
}
(indices, new_offsets)
} else {
// we have already ensure that validity is not none.
let validity = listarr.validity().unwrap();
Expand Down Expand Up @@ -105,20 +170,22 @@ impl ChunkExplode for ListChunked {
new_offsets.push(current_offset);
})
}
// SAFETY: the indices we generate are in bounds
let chunk = unsafe { take_unchecked(values.as_ref(), &indices.into()) };
// SAFETY: inner_dtype should be correct
let s = unsafe {
Series::from_chunks_and_dtype_unchecked(
self.name(),
vec![chunk],
&self.inner_dtype().to_physical(),
)
};
// SAFETY: monotonically increasing
let new_offsets = unsafe { OffsetsBuffer::new_unchecked(new_offsets.into()) };
(s, new_offsets)
}
(indices, new_offsets)
};

// SAFETY: the indices we generate are in bounds
let chunk = unsafe { take_unchecked(values.as_ref(), &indices.into()) };
// SAFETY: inner_dtype should be correct
let s = unsafe {
Series::from_chunks_and_dtype_unchecked(
self.name(),
vec![chunk],
&self.inner_dtype().to_physical(),
)
};
// SAFETY: monotonically increasing
let new_offsets = unsafe { OffsetsBuffer::new_unchecked(new_offsets.into()) };
(s, new_offsets)
};
debug_assert_eq!(s.name(), self.name());
// restore logical type
Expand Down
5 changes: 0 additions & 5 deletions crates/polars-core/src/series/implementations/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::borrow::Cow;
use super::{private, MetadataFlags};
use crate::chunked_array::cast::CastOptions;
use crate::chunked_array::comparison::*;
use crate::chunked_array::ops::explode::ExplodeByOffsets;
use crate::chunked_array::AsSinglePtr;
#[cfg(feature = "algorithm_group_by")]
use crate::frame::group_by::*;
Expand All @@ -30,10 +29,6 @@ impl private::PrivateSeries for SeriesWrap<ArrayChunked> {
self.0.set_flags(flags)
}

fn explode_by_offsets(&self, offsets: &[i64]) -> Series {
self.0.explode_by_offsets(offsets)
}

unsafe fn equal_element(&self, idx_self: usize, idx_other: usize, other: &Series) -> bool {
self.0.equal_element(idx_self, idx_other, other)
}
Expand Down
Loading

0 comments on commit 79cffee

Please sign in to comment.