Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preallocate for FixedSizeList in concat #5862

Merged
merged 10 commits into from
Jun 21, 2024
20 changes: 17 additions & 3 deletions arrow-data/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,10 @@ impl<'a> MutableArrayData<'a> {
array_capacity = *capacity;
new_buffers(data_type, *capacity)
}
(DataType::List(_) | DataType::LargeList(_), Capacities::List(capacity, _)) => {
(
DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _),
Capacities::List(capacity, _),
) => {
array_capacity = *capacity;
new_buffers(data_type, *capacity)
}
Expand Down Expand Up @@ -501,12 +504,23 @@ impl<'a> MutableArrayData<'a> {
MutableArrayData::new(value_child, use_nulls, array_capacity),
]
}
DataType::FixedSizeList(_, _) => {
DataType::FixedSizeList(_, size) => {
let children = arrays
.iter()
.map(|array| &array.child_data()[0])
.collect::<Vec<_>>();
vec![MutableArrayData::new(children, use_nulls, array_capacity)]
let capacities =
if let Capacities::List(capacity, ref child_capacities) = capacities {
child_capacities
.clone()
.map(|c| *c)
.unwrap_or(Capacities::Array(capacity * *size as usize))
} else {
Capacities::Array(array_capacity * *size as usize)
};
vec![MutableArrayData::with_capacities(
children, use_nulls, capacities,
)]
}
DataType::Union(fields, _) => (0..fields.len())
.map(|i| {
Expand Down
89 changes: 77 additions & 12 deletions arrow-select/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,34 @@ fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
Capacities::Binary(item_capacity, Some(bytes_capacity))
}

fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
if let DataType::FixedSizeList(f, _) = data_type {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be returning Capacities::Array() if the children are primitive / fixed size? I ask because I see it handled above and it seems like it would avoid some allocations and iteration over children.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's fair we can avoid at least one iteration and a Box::new() if only recursively call get_capacity() when we know that it will have some special handling other than Capacities::Array(arrays.iter().map(|a| a.len()).sum()).

b30566a

let item_capacity = arrays.iter().map(|a| a.len()).sum();
let child_data_type = f.data_type();
match child_data_type {
// These types should match the types that `get_capacity`
// has special handling for.
DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary
| DataType::FixedSizeList(_, _) => {
let values: Vec<&dyn arrow_array::Array> = arrays
.iter()
.map(|a| a.as_fixed_size_list().values().as_ref())
.collect();
Capacities::List(
item_capacity,
Some(Box::new(get_capacity(&values, child_data_type))),
)
},
_ => Capacities::Array(item_capacity),
}
} else {
unreachable!("illegal data type for fixed size list")
}
}

fn concat_dictionaries<K: ArrowDictionaryKeyType>(
arrays: &[&dyn Array],
) -> Result<ArrayRef, ArrowError> {
Expand Down Expand Up @@ -107,6 +135,17 @@ macro_rules! dict_helper {
};
}

fn get_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
match data_type {
DataType::Utf8 => binary_capacity::<Utf8Type>(arrays),
DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
DataType::Binary => binary_capacity::<BinaryType>(arrays),
DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
DataType::FixedSizeList(_, _) => fixed_size_list_capacity(arrays, data_type),
_ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
}
}

/// Concatenate multiple [Array] of the same type into a single [ArrayRef].
pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
if arrays.is_empty() {
Expand All @@ -124,20 +163,15 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
"It is not possible to concatenate arrays of different data types.".to_string(),
));
}

let capacity = match d {
DataType::Utf8 => binary_capacity::<Utf8Type>(arrays),
DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
DataType::Binary => binary_capacity::<BinaryType>(arrays),
DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
DataType::Dictionary(k, _) => downcast_integer! {
if let DataType::Dictionary(k, _) = d {
downcast_integer! {
k.as_ref() => (dict_helper, arrays),
_ => unreachable!("illegal dictionary key type {k}")
},
_ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
};

concat_fallback(arrays, capacity)
};
} else {
let capacity = get_capacity(arrays, d);
concat_fallback(arrays, capacity)
}
}

/// Concatenates arrays using MutableArrayData
Expand Down Expand Up @@ -373,6 +407,37 @@ mod tests {
assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
}

#[test]
fn test_concat_primitive_fixed_size_list_arrays() {
let list1 = vec![
Some(vec![Some(-1), None]),
None,
Some(vec![Some(10), Some(20)]),
];
let list1_array =
FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone(), 2);

let list2 = vec![
None,
Some(vec![Some(100), None]),
Some(vec![Some(102), Some(103)]),
];
let list2_array =
FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone(), 2);

let list3 = vec![Some(vec![Some(1000), Some(1001)])];
let list3_array =
FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone(), 2);

let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();

let expected = list1.into_iter().chain(list2).chain(list3);
let array_expected =
FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(expected, 2);

assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
}

#[test]
fn test_concat_struct_arrays() {
let field = Arc::new(Field::new("field", DataType::Int64, true));
Expand Down
20 changes: 20 additions & 0 deletions arrow/benches/concatenate_kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#[macro_use]
extern crate criterion;
use std::sync::Arc;

use criterion::Criterion;

extern crate arrow;
Expand Down Expand Up @@ -82,6 +84,24 @@ fn add_benchmark(c: &mut Criterion) {
c.bench_function("concat str nulls 1024", |b| {
b.iter(|| bench_concat(&v1, &v2))
});

let v1 = FixedSizeListArray::try_new(
Arc::new(Field::new("item", DataType::Int32, true)),
1024,
Arc::new(create_primitive_array::<Int32Type>(1024 * 1024, 0.0)),
None,
)
.unwrap();
let v2 = FixedSizeListArray::try_new(
Arc::new(Field::new("item", DataType::Int32, true)),
1024,
Arc::new(create_primitive_array::<Int32Type>(1024 * 1024, 0.0)),
None,
)
.unwrap();
c.bench_function("concat fixed size lists", |b| {
b.iter(|| bench_concat(&v1, &v2))
});
}

criterion_group!(benches, add_benchmark);
Expand Down
92 changes: 41 additions & 51 deletions arrow/tests/array_transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use arrow::array::{
Array, ArrayRef, BooleanArray, Decimal128Array, DictionaryArray, FixedSizeBinaryArray,
Int16Array, Int32Array, Int64Array, Int64Builder, ListArray, ListBuilder, MapBuilder,
NullArray, StringArray, StringBuilder, StringDictionaryBuilder, StructArray, UInt8Array,
UnionArray,
FixedSizeListBuilder, Int16Array, Int32Array, Int64Array, Int64Builder, ListArray, ListBuilder,
MapBuilder, NullArray, StringArray, StringBuilder, StringDictionaryBuilder, StructArray,
UInt16Array, UInt16Builder, UInt8Array, UnionArray,
};
use arrow::datatypes::Int16Type;
use arrow_array::StringViewArray;
Expand Down Expand Up @@ -1074,43 +1074,42 @@ fn test_mixed_types() {
MutableArrayData::new(vec![&a, &b], false, 4);
}

/*
// this is an old test used on a meanwhile removed dead code
// that is still useful when `MutableArrayData` supports fixed-size lists.
#[test]
fn test_fixed_size_list_append() -> Result<()> {
let int_builder = UInt16Builder::new(64);
fn test_fixed_size_list_append() {
let int_builder = UInt16Builder::with_capacity(64);
let mut builder = FixedSizeListBuilder::<UInt16Builder>::new(int_builder, 2);
builder.values().append_slice(&[1, 2])?;
builder.append(true)?;
builder.values().append_slice(&[3, 4])?;
builder.append(false)?;
builder.values().append_slice(&[5, 6])?;
builder.append(true)?;

let a_builder = UInt16Builder::new(64);
builder.values().append_slice(&[1, 2]);
builder.append(true);
builder.values().append_slice(&[3, 4]);
builder.append(false);
builder.values().append_slice(&[5, 6]);
builder.append(true);
let a = builder.finish().into_data();

let a_builder = UInt16Builder::with_capacity(64);
let mut a_builder = FixedSizeListBuilder::<UInt16Builder>::new(a_builder, 2);
a_builder.values().append_slice(&[7, 8])?;
a_builder.append(true)?;
a_builder.values().append_slice(&[9, 10])?;
a_builder.append(true)?;
a_builder.values().append_slice(&[11, 12])?;
a_builder.append(false)?;
a_builder.values().append_slice(&[13, 14])?;
a_builder.append(true)?;
a_builder.values().append_null()?;
a_builder.values().append_null()?;
a_builder.append(true)?;
let a = a_builder.finish();
a_builder.values().append_slice(&[7, 8]);
a_builder.append(true);
a_builder.values().append_slice(&[9, 10]);
a_builder.append(true);
a_builder.values().append_slice(&[11, 12]);
a_builder.append(false);
a_builder.values().append_slice(&[13, 14]);
a_builder.append(true);
a_builder.values().append_null();
a_builder.values().append_null();
a_builder.append(true);
let b = a_builder.finish().into_data();

let mut mutable = MutableArrayData::new(vec![&a, &b], false, 10);
mutable.extend(0, 0, a.len());
mutable.extend(1, 0, b.len());

// append array
builder.append_data(&[
a.data(),
a.slice(1, 3).data(),
a.slice(2, 1).data(),
a.slice(5, 0).data(),
])?;
let finished = builder.finish();
mutable.extend(1, 1, 4);
mutable.extend(1, 2, 3);

let finished = mutable.freeze();

let expected_int_array = UInt16Array::from(vec![
Some(1),
Expand Down Expand Up @@ -1141,23 +1140,14 @@ fn test_fixed_size_list_append() -> Result<()> {
Some(11),
Some(12),
]);
let expected_list_data = ArrayData::new(
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::UInt16, true)),
2,
),
let expected_fixed_size_list_data = ArrayData::try_new(
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::UInt16, true)), 2),
12,
None,
None,
Some(Buffer::from(&[0b11011101, 0b101])),
0,
vec![],
vec![expected_int_array.data()],
);
let expected_list =
FixedSizeListArray::from(Arc::new(expected_list_data) as ArrayData);
assert_eq!(&expected_list.values(), &finished.values());
assert_eq!(expected_list.len(), finished.len());

Ok(())
vec![expected_int_array.to_data()],
)
.unwrap();
assert_eq!(finished, expected_fixed_size_list_data);
}
*/
Loading