Skip to content

Commit

Permalink
Switch to using arrow2::bitmap::Bitmap for validity
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Aug 19, 2023
1 parent 7e2ffb0 commit 931758a
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 71 deletions.
5 changes: 1 addition & 4 deletions src/daft-core/src/array/ops/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,7 @@ impl Broadcastable for FixedSizeListArray {
let validity = if self.is_valid(0) {
None
} else {
Some(BooleanArray::from((
"",
repeat(false).take(num).collect::<Vec<_>>().as_slice(),
)))
Some(arrow2::bitmap::Bitmap::from_iter(repeat(false).take(num)))
};
Ok(Self::new(self.field.clone(), concatted, validity))
}
Expand Down
12 changes: 2 additions & 10 deletions src/daft-core/src/array/ops/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,7 @@ impl DaftCountAggable for &FixedSizeListArray {
type Output = DaftResult<DataArray<UInt64Type>>;

fn count(&self, mode: CountMode) -> Self::Output {
let arrow_bitmap = self
.validity
.as_ref()
.map(|v| arrow2::bitmap::Bitmap::from_iter(v.into_iter().map(|v| v.unwrap())));
let count = count_arrow_bitmap(&mode, arrow_bitmap.as_ref(), self.len());
let count = count_arrow_bitmap(&mode, self.validity.as_ref(), self.len());
let result_arrow_array = Box::new(arrow2::array::PrimitiveArray::from([Some(count)]));
DataArray::<UInt64Type>::new(
Arc::new(Field::new(self.field.name.clone(), DataType::UInt64)),
Expand All @@ -111,12 +107,8 @@ impl DaftCountAggable for &FixedSizeListArray {
}

fn grouped_count(&self, groups: &GroupIndices, mode: CountMode) -> Self::Output {
let arrow_bitmap = self
.validity
.as_ref()
.map(|v| arrow2::bitmap::Bitmap::from_iter(v.into_iter().map(|v| v.unwrap())));
let counts_per_group: Vec<_> =
grouped_count_arrow_bitmap(groups, &mode, arrow_bitmap.as_ref());
grouped_count_arrow_bitmap(groups, &mode, self.validity.as_ref());
Ok(DataArray::<UInt64Type>::from((
self.field.name.as_ref(),
counts_per_group,
Expand Down
10 changes: 9 additions & 1 deletion src/daft-core/src/array/ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,15 @@ impl crate::datatypes::nested_arrays::FixedSizeListArray {
.collect();
let expanded_filter = BooleanArray::from(("", expanded_filter.as_slice()));
let filtered_child = self.flat_child.filter(&expanded_filter)?;
let filtered_validity = self.validity.as_ref().map(|v| v.filter(mask).unwrap());
let filtered_validity = self.validity.as_ref().map(|validity| {
arrow2::bitmap::Bitmap::from_iter(mask.into_iter().zip(validity.iter()).filter_map(
|(keep, valid)| match keep {
None => None,
Some(false) => None,
Some(true) => Some(valid),
},
))
});
Ok(Self::new(
self.field.clone(),
filtered_child,
Expand Down
9 changes: 3 additions & 6 deletions src/daft-core/src/array/ops/from_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use common_error::{DaftError, DaftResult};
use crate::{
array::DataArray,
datatypes::{
logical::LogicalArray, nested_arrays::FixedSizeListArray, BooleanArray, DaftDataType,
DaftLogicalType, DaftPhysicalType, Field,
logical::LogicalArray, nested_arrays::FixedSizeListArray, DaftDataType, DaftLogicalType,
DaftPhysicalType, Field,
},
series::IntoSeries,
with_match_daft_types, DataType,
Expand Down Expand Up @@ -54,13 +54,10 @@ impl FromArrow for FixedSizeListArray {
let child_series = with_match_daft_types!(daft_child_field.dtype, |$T| {
<$T as DaftDataType>::ArrayType::from_arrow(daft_child_field.as_ref(), arrow_child_array.clone())?.into_series()
});
let validity = arrow_arr.validity().map(
|v| BooleanArray::from(("", v.iter().collect::<Vec<bool>>().as_slice()))
);
Ok(FixedSizeListArray::new(
Arc::new(field.clone()),
child_series,
validity,
arrow_arr.validity().cloned(),
))
}
(d, a) => Err(DaftError::TypeError(format!("Attempting to create Daft FixedSizeListArray with type {:?} from arrow array with type {}", a, d)))
Expand Down
11 changes: 4 additions & 7 deletions src/daft-core/src/array/ops/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use pyo3::Python;
use crate::{
array::{pseudo_arrow::PseudoArrowArray, DataArray},
datatypes::{
logical::LogicalArray, nested_arrays::FixedSizeListArray, BooleanArray, DaftDataType,
DaftLogicalType, DaftPhysicalType, DataType, Field,
logical::LogicalArray, nested_arrays::FixedSizeListArray, DaftDataType, DaftLogicalType,
DaftPhysicalType, DataType, Field,
},
};

Expand Down Expand Up @@ -88,11 +88,8 @@ where
impl FullNull for FixedSizeListArray {
fn full_null(name: &str, dtype: &DataType, length: usize) -> Self {
let empty = Self::empty(name, dtype);
let validity = Some(BooleanArray::from((
"",
arrow2::array::BooleanArray::from_iter(repeat(Some(false)).take(length)),
)));
Self::new(empty.field, empty.flat_child, validity)
let validity = arrow2::bitmap::Bitmap::from_iter(repeat(false).take(length));
Self::new(empty.field, empty.flat_child, Some(validity))
}

fn empty(name: &str, dtype: &DataType) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-core/src/array/ops/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ mod tests {
);
let flat_child = Int32Array::from(("foo", (0..6).collect::<Vec<i32>>()));
let raw_validity = vec![true, false, true];
let validity = Some(BooleanArray::from(("foo", raw_validity.as_slice())));
let validity = Some(arrow2::bitmap::Bitmap::from(raw_validity.as_slice()));
let arr = FixedSizeListArray::new(field, flat_child.into_series(), validity);
assert_eq!(arr.len(), 3);

Expand Down
6 changes: 4 additions & 2 deletions src/daft-core/src/array/ops/if_else.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl FixedSizeListArray {
});

let results = results.collect::<Vec<_>>();
let validity = BooleanArray::from((
let _validity = BooleanArray::from((
"",
results
.iter()
Expand All @@ -368,7 +368,9 @@ impl FixedSizeListArray {
Ok(FixedSizeListArray::new(
self.field.clone(),
child_series,
Some(validity),
Some(arrow2::bitmap::Bitmap::from_iter(
results.iter().map(|(_, v)| *v),
)),
))
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/daft-core/src/array/ops/len.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ impl PythonArray {
}
}

/// From arrow2 private method (arrow2::compute::aggregate::validity_size)
fn validity_size(validity: Option<&arrow2::bitmap::Bitmap>) -> usize {
validity.as_ref().map(|b| b.as_slice().0.len()).unwrap_or(0)
}

impl FixedSizeListArray {
pub fn size_bytes(&self) -> DaftResult<usize> {
Ok(self.flat_child.size_bytes()?
+ self
.validity
.as_ref()
.map(|v| v.size_bytes())
.unwrap_or(Ok(0))?)
Ok(self.flat_child.size_bytes()? + validity_size(self.validity.as_ref()))
}
}
7 changes: 5 additions & 2 deletions src/daft-core/src/array/ops/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ impl DaftIsNull for FixedSizeListArray {
fn is_null(&self) -> Self::Output {
match &self.validity {
None => Ok(BooleanArray::from((
"",
self.name(),
repeat(false)
.take(self.len())
.collect::<Vec<_>>()
.as_slice(),
))),
Some(validity) => Ok(validity.clone()),
Some(validity) => Ok(BooleanArray::from((
self.name(),
validity.into_iter().collect::<Vec<_>>().as_slice(),
))),
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/daft-core/src/array/ops/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
DataType, IntoSeries,
};
use common_error::DaftResult;
use num_traits::ToPrimitive;

use super::as_arrow::AsArrow;

Expand Down Expand Up @@ -88,7 +89,6 @@ impl crate::datatypes::PythonArray {
use crate::datatypes::PythonType;

use arrow2::array::Array;
use arrow2::types::Index;
use pyo3::prelude::*;

let indices = idx.as_arrow();
Expand All @@ -102,7 +102,7 @@ impl crate::datatypes::PythonArray {
indices
.iter()
.map(|maybe_idx| match maybe_idx {
Some(idx) => old_values[idx.to_usize()].clone(),
Some(idx) => old_values[arrow2::types::Index::to_usize(idx)].clone(),
None => py_none.clone(),
})
.collect()
Expand Down Expand Up @@ -168,10 +168,16 @@ impl FixedSizeListArray {
)),
))
.into_series();
let taken_validity = self.validity.as_ref().map(|v| {
arrow2::bitmap::Bitmap::from_iter(idx.into_iter().map(|i| match i {
None => false,
Some(i) => v.get_bit(i.to_usize().unwrap()),
}))
});
Ok(Self::new(
self.field.clone(),
self.flat_child.take(&child_idx)?,
self.validity.as_ref().map(|v| v.take(idx).unwrap()),
taken_validity,
))
}
}
Expand Down
48 changes: 19 additions & 29 deletions src/daft-core/src/datatypes/nested_arrays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use std::sync::Arc;

use common_error::{DaftError, DaftResult};

use crate::datatypes::{BooleanArray, DaftArrayType, Field};
use crate::datatypes::{DaftArrayType, Field};
use crate::series::Series;
use crate::DataType;

#[derive(Clone)]
pub struct FixedSizeListArray {
pub field: Arc<Field>,
pub flat_child: Series,
pub(crate) validity: Option<BooleanArray>,
pub validity: Option<arrow2::bitmap::Bitmap>,
}

impl DaftArrayType for FixedSizeListArray {}
Expand All @@ -20,7 +20,7 @@ impl FixedSizeListArray {
pub fn new<F: Into<Arc<Field>>>(
field: F,
flat_child: Series,
validity: Option<BooleanArray>,
validity: Option<arrow2::bitmap::Bitmap>,
) -> Self {
let field: Arc<Field> = field.into();
match &field.as_ref().dtype {
Expand All @@ -38,12 +38,6 @@ impl FixedSizeListArray {
field
)
}
// validity must not contain any null values
if let Some(validity) = validity.as_ref() && let Some(validity) = validity.data().validity() {
validity.iter().for_each(|v| if !v {
panic!("FixedSizeListArray validity BooleanArray mask must contain all valid values")
})
}
FixedSizeListArray {
field,
flat_child,
Expand All @@ -65,18 +59,14 @@ impl FixedSizeListArray {
None
} else {
let lens = arrays.iter().map(|a| a.len());
let concatted_validities: Vec<bool> = validities
.iter()
.zip(lens)
.flat_map(|(v, l)| {
let x: Box<dyn Iterator<Item = bool>> = match v {
None => Box::new(repeat(true).take(l)),
Some(v) => Box::new(v.into_iter().map(|x| x.unwrap())),
};
x
})
.collect();
Some(BooleanArray::from(("", concatted_validities.as_slice())))
let concatted_validities = validities.iter().zip(lens).flat_map(|(v, l)| {
let x: Box<dyn Iterator<Item = bool>> = match v {
None => Box::new(repeat(true).take(l)),
Some(v) => Box::new(v.into_iter()),
};
x
});
Some(arrow2::bitmap::Bitmap::from_iter(concatted_validities))
};

Ok(Self::new(
Expand Down Expand Up @@ -127,19 +117,16 @@ impl FixedSizeListArray {
Ok(Self::new(
self.field.clone(),
self.flat_child.slice(start * size, end * size)?,
self.validity.as_ref().map(|v| v.slice(start, end).unwrap()),
self.validity.as_ref().map(|v| v.clone().sliced(start, end)),
))
}

pub fn to_arrow(&self) -> Box<dyn arrow2::array::Array> {
let arrow_dtype = self.data_type().to_arrow().unwrap();
let arrow_validity = self.validity.as_ref().map(|validity| {
arrow2::bitmap::Bitmap::from_iter(validity.into_iter().map(|v| v.unwrap()))
});
Box::new(arrow2::array::FixedSizeListArray::new(
arrow_dtype,
self.flat_child.to_arrow(),
arrow_validity,
self.validity.clone(),
))
}

Expand All @@ -157,7 +144,7 @@ mod tests {
use common_error::DaftResult;

use crate::{
datatypes::{BooleanArray, Field, Int32Array},
datatypes::{Field, Int32Array},
DataType, IntoSeries,
};

Expand All @@ -171,8 +158,11 @@ mod tests {
);
let num_valid_elements = validity.iter().map(|v| if *v { 1 } else { 0 }).sum();
let flat_child = Int32Array::from(("foo", (0..num_valid_elements).collect::<Vec<i32>>()));
let validity = Some(BooleanArray::from(("foo", validity)));
FixedSizeListArray::new(field, flat_child.into_series(), validity)
FixedSizeListArray::new(
field,
flat_child.into_series(),
Some(arrow2::bitmap::Bitmap::from(validity)),
)
}

#[test]
Expand Down

0 comments on commit 931758a

Please sign in to comment.