From 98dbadb275a5e7df62289769b2e37dc18ea338f4 Mon Sep 17 00:00:00 2001 From: Andrew Gazelka Date: Mon, 7 Oct 2024 11:36:54 -0700 Subject: [PATCH] [FEAT] add `list.value_counts()` (#2902) This PR implements the `list.value_counts()` function and refactors Map types to use a more explicit key-value structure. It also includes various improvements and bug fixes across the codebase. ## Key changes 1. **Implemented list.value_counts() function** 2. **Refactored Map type representation** - Updated `DataType` enum in `src/daft-schema/src/dtype.rs` to use explicit key and value types 3. **Improved error handling and type checking** - Enhanced type checking in various parts of the codebase - Improved error messages for better debugging 4. **Performance optimizations** - Refactored some operations to be more efficient, especially in list and map operations 5. **Code cleanup and minor improvements** - Removed unnecessary clones and improved code readability - Updated comments and documentation --- .cargo/config.toml | 2 + daft/daft/__init__.pyi | 1 + daft/expressions/expressions.py | 64 ++++-- src/arrow2/src/array/list/mod.rs | 10 +- src/arrow2/src/array/map/mod.rs | 81 ++++++- src/arrow2/src/array/mod.rs | 214 ++++++++++++++---- src/arrow2/src/array/struct_/mod.rs | 9 + src/arrow2/src/compute/cast/mod.rs | 8 +- src/arrow2/src/datatypes/mod.rs | 61 ++++- src/arrow2/src/error.rs | 1 + src/arrow2/src/offset.rs | 68 ++++-- src/common/tracing/src/lib.rs | 60 ++--- .../src/array/fixed_size_list_array.rs | 26 ++- src/daft-core/src/array/list_array.rs | 11 + src/daft-core/src/array/mod.rs | 47 ++-- .../src/array/ops/arrow2/comparison.rs | 2 +- src/daft-core/src/array/ops/cast.rs | 7 +- src/daft-core/src/array/ops/from_arrow.rs | 68 +++--- src/daft-core/src/array/ops/groups.rs | 14 +- src/daft-core/src/array/ops/list.rs | 143 +++++++++++- src/daft-core/src/array/ops/map.rs | 87 ++++--- src/daft-core/src/array/ops/sparse_tensor.rs | 11 +- src/daft-core/src/array/serdes.rs | 6 +- src/daft-core/src/array/struct_array.rs | 2 + src/daft-core/src/datatypes/logical.rs | 1 + src/daft-core/src/datatypes/matching.rs | 54 ++--- src/daft-core/src/lib.rs | 1 + src/daft-core/src/series/from.rs | 92 +++++++- src/daft-core/src/series/mod.rs | 7 + src/daft-core/src/series/ops/concat.rs | 41 ++-- src/daft-core/src/series/ops/list.rs | 16 ++ src/daft-core/src/series/ops/map.rs | 13 +- src/daft-core/src/series/serdes.rs | 5 +- src/daft-dsl/src/functions/map/get.rs | 52 ++--- src/daft-functions/src/list/mod.rs | 5 + src/daft-functions/src/list/value_counts.rs | 72 ++++++ src/daft-micropartition/src/lib.rs | 1 + src/daft-micropartition/src/micropartition.rs | 12 +- .../src/ops/eval_expressions.rs | 10 +- src/daft-schema/src/dtype.rs | 126 +++++++---- src/daft-schema/src/field.rs | 9 + src/daft-schema/src/python/datatype.rs | 8 +- src/daft-schema/src/schema.rs | 22 +- src/daft-sql/src/planner.rs | 2 +- src/daft-stats/src/column_stats/mod.rs | 2 +- src/daft-stats/src/table_stats.rs | 13 +- src/daft-table/src/ffi.rs | 6 +- src/daft-table/src/lib.rs | 33 ++- src/daft-table/src/repr_html.rs | 2 +- tests/expressions/test_expressions.py | 75 ++++++ tests/integration/iceberg/test_table_load.py | 2 +- tests/io/test_parquet_roundtrip.py | 34 ++- tests/table/map/test_map_get.py | 3 +- 53 files changed, 1302 insertions(+), 420 deletions(-) create mode 100644 .cargo/config.toml create mode 100644 src/daft-functions/src/list/value_counts.rs diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000000..9affa24854 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[env] +PYO3_PYTHON = "./.venv/bin/python" diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index f6a496b1d7..4f465eb5fe 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1284,6 +1284,7 @@ def dt_truncate(expr: PyExpr, interval: str, relative_to: PyExpr) -> PyExpr: ... # --- def explode(expr: PyExpr) -> PyExpr: ... def list_sort(expr: PyExpr, desc: PyExpr) -> PyExpr: ... +def list_value_counts(expr: PyExpr) -> PyExpr: ... def list_join(expr: PyExpr, delimiter: PyExpr) -> PyExpr: ... def list_count(expr: PyExpr, mode: CountMode) -> PyExpr: ... def list_get(expr: PyExpr, idx: PyExpr, default: PyExpr) -> PyExpr: ... diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index ae3655b430..52d1cae0d8 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -2930,6 +2930,40 @@ def join(self, delimiter: str | Expression) -> Expression: delimiter_expr = Expression._to_expression(delimiter) return Expression._from_pyexpr(native.list_join(self._expr, delimiter_expr._expr)) + def value_counts(self) -> Expression: + """Counts the occurrences of each unique value in the list. + + Returns: + Expression: A Map expression where the keys are unique elements from the + original list of type X, and the values are UInt64 counts representing + the number of times each element appears in the list. + + Note: + This function does not work for nested types. For example, it will not produce a map + with lists as keys. + + Example: + >>> import daft + >>> df = daft.from_pydict({"letters": [["a", "b", "a"], ["b", "c", "b", "c"]]}) + >>> df.with_column("value_counts", df["letters"].list.value_counts()).collect() + ╭──────────────┬───────────────────╮ + │ letters ┆ value_counts │ + │ --- ┆ --- │ + │ List[Utf8] ┆ Map[Utf8: UInt64] │ + ╞══════════════╪═══════════════════╡ + │ [a, b, a] ┆ [{key: a, │ + │ ┆ value: 2, │ + │ ┆ }, {key: … │ + ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ + │ [b, c, b, c] ┆ [{key: b, │ + │ ┆ value: 2, │ + │ ┆ }, {key: … │ + ╰──────────────┴───────────────────╯ + + (Showing first 2 of 2 rows) + """ + return Expression._from_pyexpr(native.list_value_counts(self._expr)) + def count(self, mode: CountMode = CountMode.Valid) -> Expression: """Counts the number of elements in each list @@ -3092,21 +3126,21 @@ def get(self, key: Expression) -> Expression: >>> df = daft.from_arrow(pa.table({"map_col": pa_array})) >>> df = df.with_column("a", df["map_col"].map.get("a")) >>> df.show() - ╭──────────────────────────────────────┬───────╮ - │ map_col ┆ a │ - │ --- ┆ --- │ - │ Map[Struct[key: Utf8, value: Int64]] ┆ Int64 │ - ╞══════════════════════════════════════╪═══════╡ - │ [{key: a, ┆ 1 │ - │ value: 1, ┆ │ - │ }] ┆ │ - ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ - │ [] ┆ None │ - ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ - │ [{key: b, ┆ None │ - │ value: 2, ┆ │ - │ }] ┆ │ - ╰──────────────────────────────────────┴───────╯ + ╭──────────────────┬───────╮ + │ map_col ┆ a │ + │ --- ┆ --- │ + │ Map[Utf8: Int64] ┆ Int64 │ + ╞══════════════════╪═══════╡ + │ [{key: a, ┆ 1 │ + │ value: 1, ┆ │ + │ }] ┆ │ + ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ + │ [] ┆ None │ + ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ + │ [{key: b, ┆ None │ + │ value: 2, ┆ │ + │ }] ┆ │ + ╰──────────────────┴───────╯ (Showing first 3 of 3 rows) diff --git a/src/arrow2/src/array/list/mod.rs b/src/arrow2/src/array/list/mod.rs index 3948e12002..6d0735ca04 100644 --- a/src/arrow2/src/array/list/mod.rs +++ b/src/arrow2/src/array/list/mod.rs @@ -209,12 +209,18 @@ impl ListArray { if O::IS_LARGE { match data_type.to_logical_type() { DataType::LargeList(child) => Ok(child.as_ref()), - _ => Err(Error::oos("ListArray expects DataType::LargeList")), + got => { + let msg = format!("ListArray expects DataType::LargeList, but got {got:?}"); + Err(Error::oos(msg)) + }, } } else { match data_type.to_logical_type() { DataType::List(child) => Ok(child.as_ref()), - _ => Err(Error::oos("ListArray expects DataType::List")), + got => { + let msg = format!("ListArray expects DataType::List, but got {got:?}"); + Err(Error::oos(msg)) + }, } } } diff --git a/src/arrow2/src/array/map/mod.rs b/src/arrow2/src/array/map/mod.rs index d0dcb46efb..d49a463a2f 100644 --- a/src/arrow2/src/array/map/mod.rs +++ b/src/arrow2/src/array/map/mod.rs @@ -1,3 +1,4 @@ +use super::{new_empty_array, specification::try_check_offsets_bounds, Array, ListArray}; use crate::{ bitmap::Bitmap, datatypes::{DataType, Field}, @@ -5,8 +6,6 @@ use crate::{ offset::OffsetsBuffer, }; -use super::{new_empty_array, specification::try_check_offsets_bounds, Array}; - mod ffi; pub(super) mod fmt; mod iterator; @@ -41,20 +40,27 @@ impl MapArray { try_check_offsets_bounds(&offsets, field.len())?; let inner_field = Self::try_get_field(&data_type)?; - if let DataType::Struct(inner) = inner_field.data_type() { - if inner.len() != 2 { - return Err(Error::InvalidArgumentError( - "MapArray's inner `Struct` must have 2 fields (keys and maps)".to_string(), - )); - } - } else { + + let inner_data_type = inner_field.data_type(); + let DataType::Struct(inner) = inner_data_type else { return Err(Error::InvalidArgumentError( - "MapArray expects `DataType::Struct` as its inner logical type".to_string(), + format!("MapArray expects `DataType::Struct` as its inner logical type, but found {inner_data_type:?}"), )); + }; + + let inner_len = inner.len(); + if inner_len != 2 { + let msg = format!( + "MapArray's inner `Struct` must have 2 fields (keys and maps), but found {} fields", + inner_len + ); + return Err(Error::InvalidArgumentError(msg)); } - if field.data_type() != inner_field.data_type() { + + let field_data_type = field.data_type(); + if field_data_type != inner_field.data_type() { return Err(Error::InvalidArgumentError( - "MapArray expects `field.data_type` to match its inner DataType".to_string(), + format!("MapArray expects `field.data_type` to match its inner DataType, but found \n{field_data_type:?}\nvs\n\n\n{inner_field:?}"), )); } @@ -195,6 +201,57 @@ impl MapArray { impl Array for MapArray { impl_common_array!(); + fn convert_logical_type(&self, target_data_type: DataType) -> Box { + let is_target_map = matches!(target_data_type, DataType::Map { .. }); + + let DataType::Map(current_field, _) = self.data_type() else { + unreachable!( + "Expected MapArray to have Map data type, but found {:?}", + self.data_type() + ); + }; + + if is_target_map { + // For Map-to-Map conversions, we can clone + // (same top level representation we are still a Map). and then change the subtype in + // place. + let mut converted_array = self.to_boxed(); + converted_array.change_type(target_data_type); + return converted_array; + } + + // Target type is a LargeList, so we need to convert to a ListArray before converting + let DataType::LargeList(target_field) = &target_data_type else { + panic!("MapArray can only be converted to Map or LargeList, but target type is {target_data_type:?}"); + }; + + + let current_physical_type = current_field.data_type.to_physical_type(); + let target_physical_type = target_field.data_type.to_physical_type(); + + if current_physical_type != target_physical_type { + panic!( + "Inner physical types must be equal for conversion. Current: {:?}, Target: {:?}", + current_physical_type, target_physical_type + ); + } + + let mut converted_field = self.field.clone(); + converted_field.change_type(target_field.data_type.clone()); + + let original_offsets = self.offsets().clone(); + let converted_offsets = unsafe { original_offsets.map_unchecked(|offset| offset as i64) }; + + let converted_list = ListArray::new( + target_data_type, + converted_offsets, + converted_field, + self.validity.clone(), + ); + + Box::new(converted_list) + } + fn validity(&self) -> Option<&Bitmap> { self.validity.as_ref() } diff --git a/src/arrow2/src/array/mod.rs b/src/arrow2/src/array/mod.rs index f77cc5d60d..815e51985b 100644 --- a/src/arrow2/src/array/mod.rs +++ b/src/arrow2/src/array/mod.rs @@ -17,13 +17,12 @@ //! //! Most arrays contain a [`MutableArray`] counterpart that is neither clonable nor sliceable, but //! can be operated in-place. -use std::any::Any; -use std::sync::Arc; +use std::{any::Any, sync::Arc}; -use crate::error::Result; use crate::{ bitmap::{Bitmap, MutableBitmap}, datatypes::DataType, + error::Result, }; mod physical_binary; @@ -55,6 +54,21 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static { /// When the validity is [`None`], all slots are valid. fn validity(&self) -> Option<&Bitmap>; + /// Returns an iterator over the direct children of this Array. + /// + /// This method is useful for accessing child Arrays in composite types such as struct arrays. + /// By default, it returns an empty iterator, as most array types do not have child arrays. + /// + /// # Returns + /// A boxed iterator yielding mutable references to child Arrays. + /// + /// # Examples + /// For a StructArray, this would return an iterator over its field arrays. + /// For most other array types, this returns an empty iterator. + fn direct_children<'a>(&'a mut self) -> Box + 'a> { + Box::new(core::iter::empty()) + } + /// The number of null slots on this [`Array`]. /// # Implementation /// This is `O(1)` since the number of null elements is pre-computed. @@ -144,46 +158,51 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static { /// Clone a `&dyn Array` to an owned `Box`. fn to_boxed(&self) -> Box; - /// Overwrites [`Array`]'s type with a different logical type. + /// Changes the logical type of this array in-place. /// - /// This function is useful to assign a different [`DataType`] to the array. - /// Used to change the arrays' logical type (see example). This updates the array - /// in place and does not clone the array. - /// # Example - /// ```rust,ignore - /// use arrow2::array::Int32Array; - /// use arrow2::datatypes::DataType; + /// This method modifies the array's `DataType` without changing its underlying data. + /// It's useful for reinterpreting the logical meaning of the data (e.g., from Int32 to Date32). + /// + /// # Arguments + /// * `data_type` - The new [`DataType`] to assign to this array. /// - /// let &mut array = Int32Array::from(&[Some(1), None, Some(2)]) - /// array.to(DataType::Date32); - /// assert_eq!( - /// format!("{:?}", array), - /// "Date32[1970-01-02, None, 1970-01-03]" - /// ); - /// ``` /// # Panics - /// Panics iff the `data_type`'s [`PhysicalType`] is not equal to array's `PhysicalType`. + /// Panics if the new `data_type`'s [`PhysicalType`] is not equal to the array's current [`PhysicalType`]. + /// + /// # Example + /// ``` + /// # use arrow2::array::{Array, Int32Array}; + /// # use arrow2::datatypes::DataType; + /// let mut array = Int32Array::from(&[Some(1), None, Some(2)]); + /// array.change_type(DataType::Date32); + /// assert_eq!(array.data_type(), &DataType::Date32); + /// ``` fn change_type(&mut self, data_type: DataType); - /// Returns a new [`Array`] with a different logical type. + /// Creates a new [`Array`] with a different logical type. /// - /// This function is useful to assign a different [`DataType`] to the array. - /// Used to change the arrays' logical type (see example). Unlike, this clones the array - /// in order to return a new array. - /// # Example - /// ```rust,ignore - /// use arrow2::array::Int32Array; - /// use arrow2::datatypes::DataType; + /// This method returns a new array with the specified `DataType`, leaving the original array unchanged. + /// It's useful for creating a new view of the data with a different logical interpretation. + /// + /// # Arguments + /// * `data_type` - The [`DataType`] for the new array. + /// + /// # Returns + /// A new `Box` with the specified `DataType`. /// - /// let array = Int32Array::from(&[Some(1), None, Some(2)]).to(DataType::Date32); - /// assert_eq!( - /// format!("{:?}", array), - /// "Date32[1970-01-02, None, 1970-01-03]" - /// ); - /// ``` /// # Panics - /// Panics iff the `data_type`'s [`PhysicalType`] is not equal to array's `PhysicalType`. - fn to_type(&self, data_type: DataType) -> Box { + /// Panics if the new `data_type`'s [`PhysicalType`] is not equal to the array's current [`PhysicalType`]. + /// + /// # Example + /// ``` + /// # use arrow2::array::Int32Array; + /// # use arrow2::datatypes::DataType; + /// let array = Int32Array::from(&[Some(1), None, Some(2)]); + /// let new_array = array.convert_logical_type(DataType::Date32); + /// assert_eq!(new_array.data_type(), &DataType::Date32); + /// assert_eq!(array.data_type(), &DataType::Int32); // Original array unchanged + /// ``` + fn convert_logical_type(&self, data_type: DataType) -> Box { let mut new = self.to_boxed(); new.change_type(data_type); new @@ -634,14 +653,21 @@ macro_rules! impl_common_array { fn change_type(&mut self, data_type: DataType) { if data_type.to_physical_type() != self.data_type().to_physical_type() { panic!( - "Converting array with logical type {:?} to logical type {:?} failed, physical types do not match: {:?} -> {:?}", + "Cannot change array type from {:?} to {:?}", self.data_type(), - data_type, - self.data_type().to_physical_type(), - data_type.to_physical_type(), + data_type ); } - self.data_type = data_type; + + self.data_type = data_type.clone(); + let mut children = self.direct_children(); + + data_type.direct_children(|child| { + let Some(child_elem) = children.next() else { + return; + }; + child_elem.change_type(child.clone()); + }) } }; } @@ -710,17 +736,15 @@ pub mod dyn_ord; pub mod growable; pub mod ord; -pub(crate) use iterator::ArrayAccessor; -pub use iterator::ArrayValuesIter; - -pub use equal::equal; -pub use fmt::{get_display, get_value_display}; - pub use binary::{BinaryArray, BinaryValueIter, MutableBinaryArray, MutableBinaryValuesArray}; pub use boolean::{BooleanArray, MutableBooleanArray}; pub use dictionary::{DictionaryArray, DictionaryKey, MutableDictionaryArray}; +pub use equal::equal; pub use fixed_size_binary::{FixedSizeBinaryArray, MutableFixedSizeBinaryArray}; pub use fixed_size_list::{FixedSizeListArray, MutableFixedSizeListArray}; +pub use fmt::{get_display, get_value_display}; +pub(crate) use iterator::ArrayAccessor; +pub use iterator::ArrayValuesIter; pub use list::{ListArray, ListValuesIter, MutableListArray}; pub use map::MapArray; pub use null::{MutableNullArray, NullArray}; @@ -729,9 +753,7 @@ pub use struct_::{MutableStructArray, StructArray}; pub use union::UnionArray; pub use utf8::{MutableUtf8Array, MutableUtf8ValuesArray, Utf8Array, Utf8ValuesIter}; -pub(crate) use self::ffi::offset_buffers_children_dictionary; -pub(crate) use self::ffi::FromFfi; -pub(crate) use self::ffi::ToFfi; +pub(crate) use self::ffi::{offset_buffers_children_dictionary, FromFfi, ToFfi}; /// A trait describing the ability of a struct to create itself from a iterator. /// This is similar to [`Extend`], but accepted the creation to error. @@ -774,3 +796,97 @@ pub unsafe trait GenericBinaryArray: Array { /// The offsets of the array fn offsets(&self) -> &[O]; } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + array::{ + BooleanArray, Float32Array, Int32Array, Int64Array, ListArray, MapArray, StructArray, + UnionArray, Utf8Array, + }, + datatypes::{DataType, Field, IntervalUnit, TimeUnit}, + }; + + #[test] + fn test_int32_to_date32() { + let array = Int32Array::from_slice([1, 2, 3]); + let result = array.convert_logical_type(DataType::Date32); + assert_eq!(result.data_type(), &DataType::Date32); + } + + #[test] + fn test_int64_to_timestamp() { + let array = Int64Array::from_slice([1000, 2000, 3000]); + let result = array.convert_logical_type(DataType::Timestamp(TimeUnit::Millisecond, None)); + assert_eq!( + result.data_type(), + &DataType::Timestamp(TimeUnit::Millisecond, None) + ); + } + + #[test] + fn test_boolean_to_boolean() { + let array = BooleanArray::from_slice([true, false, true]); + let result = array.convert_logical_type(DataType::Boolean); + assert_eq!(result.data_type(), &DataType::Boolean); + } + + #[test] + fn test_list_to_list() { + let values = Int32Array::from_slice([1, 2, 3, 4, 5]); + let offsets = vec![0, 2, 5]; + let list_array = ListArray::try_new( + DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + offsets.try_into().unwrap(), + Box::new(values), + None, + ) + .unwrap(); + let result = list_array.convert_logical_type(DataType::List(Box::new(Field::new( + "item", + DataType::Int32, + true, + )))); + assert_eq!( + result.data_type(), + &DataType::List(Box::new(Field::new("item", DataType::Int32, true))) + ); + } + + #[test] + fn test_struct_to_struct() { + let boolean = BooleanArray::from_slice([true, false, true]); + let int = Int32Array::from_slice([1, 2, 3]); + let struct_array = StructArray::try_new( + DataType::Struct(vec![ + Field::new("b", DataType::Boolean, true), + Field::new("i", DataType::Int32, true), + ]), + vec![ + Box::new(boolean) as Box, + Box::new(int) as Box, + ], + None, + ) + .unwrap(); + let result = struct_array.convert_logical_type(DataType::Struct(vec![ + Field::new("b", DataType::Boolean, true), + Field::new("i", DataType::Int32, true), + ])); + assert_eq!( + result.data_type(), + &DataType::Struct(vec![ + Field::new("b", DataType::Boolean, true), + Field::new("i", DataType::Int32, true), + ]) + ); + } + + #[test] + #[should_panic] + fn test_invalid_conversion() { + let array = Int32Array::from_slice([1, 2, 3]); + array.convert_logical_type(DataType::Utf8); + } +} diff --git a/src/arrow2/src/array/struct_/mod.rs b/src/arrow2/src/array/struct_/mod.rs index fb2812375c..f096e1aeb6 100644 --- a/src/arrow2/src/array/struct_/mod.rs +++ b/src/arrow2/src/array/struct_/mod.rs @@ -1,3 +1,4 @@ +use std::ops::DerefMut; use crate::{ bitmap::Bitmap, datatypes::{DataType, Field}, @@ -246,6 +247,14 @@ impl StructArray { impl Array for StructArray { impl_common_array!(); + fn direct_children<'a>(&'a mut self) -> Box + 'a> { + let iter = self.values + .iter_mut() + .map(|x| x.deref_mut()); + + Box::new(iter) + } + fn validity(&self) -> Option<&Bitmap> { self.validity.as_ref() } diff --git a/src/arrow2/src/compute/cast/mod.rs b/src/arrow2/src/compute/cast/mod.rs index b48949b215..6ad12f2cb4 100644 --- a/src/arrow2/src/compute/cast/mod.rs +++ b/src/arrow2/src/compute/cast/mod.rs @@ -506,16 +506,16 @@ pub fn cast(array: &dyn Array, to_type: &DataType, options: CastOptions) -> Resu match (from_type, to_type) { (Null, _) | (_, Null) => Ok(new_null_array(to_type.clone(), array.len())), (Extension(_, from_inner, _), Extension(_, to_inner, _)) => { - let new_arr = cast(array.to_type(*from_inner.clone()).as_ref(), to_inner, options)?; - Ok(new_arr.to_type(to_type.clone())) + let new_arr = cast(array.convert_logical_type(*from_inner.clone()).as_ref(), to_inner, options)?; + Ok(new_arr.convert_logical_type(to_type.clone())) } (Extension(_, from_inner, _), _) => { - let new_arr = cast(array.to_type(*from_inner.clone()).as_ref(), to_type, options)?; + let new_arr = cast(array.convert_logical_type(*from_inner.clone()).as_ref(), to_type, options)?; Ok(new_arr) } (_, Extension(_, to_inner, _)) => { let new_arr = cast(array, to_inner, options)?; - Ok(new_arr.to_type(to_type.clone())) + Ok(new_arr.convert_logical_type(to_type.clone())) } (Struct(from_fields), Struct(to_fields)) => match (from_fields.len(), to_fields.len()) { (from_len, to_len) if from_len == to_len => { diff --git a/src/arrow2/src/datatypes/mod.rs b/src/arrow2/src/datatypes/mod.rs index 2debc5a4f2..b5c5b1a8b5 100644 --- a/src/arrow2/src/datatypes/mod.rs +++ b/src/arrow2/src/datatypes/mod.rs @@ -5,13 +5,11 @@ mod field; mod physical_type; mod schema; +use std::{collections::BTreeMap, sync::Arc}; + pub use field::Field; pub use physical_type::*; pub use schema::Schema; - -use std::collections::BTreeMap; -use std::sync::Arc; - use serde::{Deserialize, Serialize}; /// typedef for [BTreeMap] denoting [`Field`]'s and [`Schema`]'s metadata. @@ -19,6 +17,12 @@ pub type Metadata = BTreeMap; /// typedef for [Option<(String, Option)>] descr pub(crate) type Extension = Option<(String, Option)>; +#[allow(unused_imports, reason = "used in documentation")] +use crate::array::Array; + +pub type ArrowDataType = DataType; +pub type ArrowField = Field; + /// The set of supported logical types in this crate. /// /// Each variant uniquely identifies a logical type, which define specific semantics to the data @@ -159,6 +163,55 @@ pub enum DataType { Extension(String, Box, Option), } +impl DataType { + pub fn map(field: impl Into>, keys_sorted: bool) -> Self { + Self::Map(field.into(), keys_sorted) + } + + /// Processes the direct children data types of this DataType. + /// + /// This method is useful for traversing the structure of complex data types. + /// It calls the provided closure for each immediate child data type. + /// + /// This can be used in conjunction with the [`Array::direct_children`] method + /// to process both the data types and the corresponding array data. + /// + /// # Arguments + /// + /// * `processor` - A closure that takes a reference to a DataType as its argument. + /// + /// # Examples + /// + /// ``` + /// use arrow2::datatypes::{DataType, Field}; + /// + /// let struct_type = DataType::Struct(vec![ + /// Field::new("a", DataType::Int32, true), + /// Field::new("b", DataType::Utf8, false), + /// ]); + /// + /// let mut child_types = Vec::new(); + /// struct_type.direct_children(|child_type| { + /// child_types.push(child_type); + /// }); + /// + /// assert_eq!(child_types, vec![&DataType::Int32, &DataType::Utf8]); + /// ``` + pub fn direct_children<'a>(&'a self, mut processor: impl FnMut(&'a DataType)) { + match self { + DataType::List(field) + | DataType::FixedSizeList(field, _) + | DataType::LargeList(field) + | DataType::Map(field, ..) => processor(&field.data_type), + DataType::Struct(fields) | DataType::Union(fields, _, _) => { + fields.iter().for_each(|field| processor(&field.data_type)) + } + DataType::Dictionary(_, value_type, _) => processor(value_type), + _ => {} // Other types don't have child data types + } + } +} + /// Mode of [`DataType::Union`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum UnionMode { diff --git a/src/arrow2/src/error.rs b/src/arrow2/src/error.rs index 3b7eaadf3e..3df1e19381 100644 --- a/src/arrow2/src/error.rs +++ b/src/arrow2/src/error.rs @@ -55,6 +55,7 @@ impl Error { Self::OutOfSpec(msg.into()) } + #[allow(unused)] pub(crate) fn nyi>(msg: A) -> Self { Self::NotYetImplemented(msg.into()) } diff --git a/src/arrow2/src/offset.rs b/src/arrow2/src/offset.rs index 80b45d6680..3d7a2aa869 100644 --- a/src/arrow2/src/offset.rs +++ b/src/arrow2/src/offset.rs @@ -1,9 +1,8 @@ //! Contains the declaration of [`Offset`] use std::hint::unreachable_unchecked; -use crate::buffer::Buffer; -use crate::error::Error; pub use crate::types::Offset; +use crate::{buffer::Buffer, error::Error}; /// A wrapper type of [`Vec`] representing the invariants of Arrow's offsets. /// It is guaranteed to (sound to assume that): @@ -144,10 +143,9 @@ impl Offsets { /// Returns the last offset of this container. #[inline] pub fn last(&self) -> &O { - match self.0.last() { - Some(element) => element, - None => unsafe { unreachable_unchecked() }, - } + self.0 + .last() + .unwrap_or_else(|| unsafe { unreachable_unchecked() }) } /// Returns a range (start, end) corresponding to the position `index` @@ -338,7 +336,7 @@ fn try_check_offsets(offsets: &[O]) -> Result<(), Error> { /// * Every element is `>= 0` /// * element at position `i` is >= than element at position `i-1`. #[derive(Clone, PartialEq, Debug)] -pub struct OffsetsBuffer(Buffer); +pub struct OffsetsBuffer(Buffer); impl Default for OffsetsBuffer { #[inline] @@ -347,6 +345,39 @@ impl Default for OffsetsBuffer { } } +impl OffsetsBuffer { + + /// Maps each offset to a new value, creating a new [`Self`]. + /// + /// # Safety + /// + /// This function is marked as `unsafe` because it does not check whether the resulting offsets + /// maintain the invariants required by [`OffsetsBuffer`]. The caller must ensure that: + /// + /// - The resulting offsets are monotonically increasing. + /// - The first offset is zero. + /// - All offsets are non-negative. + /// + /// Violating these invariants can lead to undefined behavior when using the resulting [`OffsetsBuffer`]. + /// + /// # Example + /// + /// ``` + /// # use arrow2::offset::OffsetsBuffer; + /// # let offsets = unsafe { OffsetsBuffer::new_unchecked(vec![0, 2, 5, 7].into()) }; + /// let doubled = unsafe { offsets.map_unchecked(|x| x * 2) }; + /// assert_eq!(doubled.buffer().as_slice(), &[0, 4, 10, 14]); + /// ``` + /// + /// Note that in this example, doubling the offsets maintains the required invariants, + /// but this may not be true for all transformations. + pub unsafe fn map_unchecked(&self, f: impl Fn(O) -> T) -> OffsetsBuffer { + let buffer = self.0.iter().copied().map(f).collect(); + + OffsetsBuffer(buffer) + } +} + impl OffsetsBuffer { /// # Safety /// This is safe iff the invariants of this struct are guaranteed in `offsets`. @@ -401,22 +432,29 @@ impl OffsetsBuffer { *self.last() - *self.first() } + pub fn ranges(&self) -> impl Iterator> + '_ { + self.0.windows(2).map(|w| { + let from = w[0]; + let to = w[1]; + debug_assert!(from <= to, "offsets must be monotonically increasing"); + from..to + }) + } + /// Returns the first offset. #[inline] pub fn first(&self) -> &O { - match self.0.first() { - Some(element) => element, - None => unsafe { unreachable_unchecked() }, - } + self.0 + .first() + .unwrap_or_else(|| unsafe { unreachable_unchecked() }) } /// Returns the last offset. #[inline] pub fn last(&self) -> &O { - match self.0.last() { - Some(element) => element, - None => unsafe { unreachable_unchecked() }, - } + self.0 + .last() + .unwrap_or_else(|| unsafe { unreachable_unchecked() }) } /// Returns a range (start, end) corresponding to the position `index` diff --git a/src/common/tracing/src/lib.rs b/src/common/tracing/src/lib.rs index 699ad6816c..acb23cc69f 100644 --- a/src/common/tracing/src/lib.rs +++ b/src/common/tracing/src/lib.rs @@ -12,36 +12,38 @@ lazy_static! { pub fn init_tracing(enable_chrome_trace: bool) { use std::sync::atomic::Ordering; - if !TRACING_INIT.swap(true, Ordering::Relaxed) { - if enable_chrome_trace { - let mut mg = CHROME_GUARD_HANDLE.lock().unwrap(); - assert!( - mg.is_none(), - "Expected chrome flush guard to be None on init" - ); - let (chrome_layer, _guard) = ChromeLayerBuilder::new() - .trace_style(tracing_chrome::TraceStyle::Threaded) - .name_fn(Box::new(|event_or_span| { - match event_or_span { - tracing_chrome::EventOrSpan::Event(ev) => ev.metadata().name().into(), - tracing_chrome::EventOrSpan::Span(s) => { - // TODO: this is where we should extract out fields (such as node id to show the different pipelines) - s.name().into() - } - } - })) - .build(); - tracing::subscriber::set_global_default( - tracing_subscriber::registry().with(chrome_layer), - ) - .unwrap(); - *mg = Some(_guard); - } else { - // Do nothing for now - } - } else { - panic!("Cannot init tracing, already initialized!") + + if TRACING_INIT.swap(true, Ordering::Relaxed) { + panic!("Cannot init tracing, already initialized!"); } + + if !enable_chrome_trace { + return; // Do nothing for now + } + + let mut mg = CHROME_GUARD_HANDLE.lock().unwrap(); + assert!( + mg.is_none(), + "Expected chrome flush guard to be None on init" + ); + + let (chrome_layer, guard) = ChromeLayerBuilder::new() + .trace_style(tracing_chrome::TraceStyle::Threaded) + .name_fn(Box::new(|event_or_span| { + match event_or_span { + tracing_chrome::EventOrSpan::Event(ev) => ev.metadata().name().into(), + tracing_chrome::EventOrSpan::Span(s) => { + // TODO: this is where we should extract out fields (such as node id to show the different pipelines) + s.name().into() + } + } + })) + .build(); + + tracing::subscriber::set_global_default(tracing_subscriber::registry().with(chrome_layer)) + .unwrap(); + + *mg = Some(guard); } pub fn refresh_chrome_trace() -> bool { diff --git a/src/daft-core/src/array/fixed_size_list_array.rs b/src/daft-core/src/array/fixed_size_list_array.rs index a8b5048b82..bf052b477b 100644 --- a/src/daft-core/src/array/fixed_size_list_array.rs +++ b/src/daft-core/src/array/fixed_size_list_array.rs @@ -1,16 +1,19 @@ use std::sync::Arc; +use arrow2::offset::OffsetsBuffer; use common_error::{DaftError, DaftResult}; use crate::{ array::growable::{Growable, GrowableArray}, datatypes::{DaftArrayType, DataType, Field}, + prelude::ListArray, series::Series, }; #[derive(Clone, Debug)] pub struct FixedSizeListArray { pub field: Arc, + /// contains all the elements of the nested lists flattened into a single contiguous array. pub flat_child: Series, validity: Option, } @@ -37,7 +40,7 @@ impl FixedSizeListArray { "FixedSizeListArray::new received values with len {} but expected it to match len of validity {} * size: {}", flat_child.len(), validity.len(), - (validity.len() * size), + validity.len() * size, ) } if child_dtype.as_ref() != flat_child.data_type() { @@ -174,6 +177,27 @@ impl FixedSizeListArray { validity, )) } + + fn generate_offsets(&self) -> OffsetsBuffer { + let size = self.fixed_element_len(); + let len = self.len(); + + // Create new offsets + let offsets: Vec = (0..=len) + .map(|i| i64::try_from(i * size).unwrap()) + .collect(); + + OffsetsBuffer::try_from(offsets).expect("Failed to create OffsetsBuffer") + } + + pub fn to_list(&self) -> ListArray { + ListArray::new( + self.field.clone(), + self.flat_child.clone(), + self.generate_offsets(), + self.validity.clone(), + ) + } } impl<'a> IntoIterator for &'a FixedSizeListArray { diff --git a/src/daft-core/src/array/list_array.rs b/src/daft-core/src/array/list_array.rs index 538c24e716..75d7b698d7 100644 --- a/src/daft-core/src/array/list_array.rs +++ b/src/daft-core/src/array/list_array.rs @@ -12,6 +12,8 @@ use crate::{ pub struct ListArray { pub field: Arc, pub flat_child: Series, + + /// Where each row starts and ends. Null rows usually have the same start/end index, but this is not guaranteed. offsets: arrow2::offset::OffsetsBuffer, validity: Option, } @@ -201,6 +203,15 @@ impl<'a> IntoIterator for &'a ListArray { } } +impl ListArray { + pub fn iter(&self) -> ListArrayIter<'_> { + ListArrayIter { + array: self, + idx: 0, + } + } +} + pub struct ListArrayIter<'a> { array: &'a ListArray, idx: usize, diff --git a/src/daft-core/src/array/mod.rs b/src/daft-core/src/array/mod.rs index 7c300c6a38..13ca7899a7 100644 --- a/src/daft-core/src/array/mod.rs +++ b/src/daft-core/src/array/mod.rs @@ -18,11 +18,12 @@ pub mod prelude; use std::{marker::PhantomData, sync::Arc}; use common_error::{DaftError, DaftResult}; +use daft_schema::field::DaftField; use crate::datatypes::{DaftArrayType, DaftPhysicalType, DataType, Field}; #[derive(Debug)] -pub struct DataArray { +pub struct DataArray { pub field: Arc, pub data: Box, marker_: PhantomData, @@ -40,30 +41,44 @@ impl DaftArrayType for DataArray { } } -impl DataArray -where - T: DaftPhysicalType, -{ - pub fn new(field: Arc, data: Box) -> DaftResult { +impl DataArray { + pub fn new( + physical_field: Arc, + arrow_array: Box, + ) -> DaftResult { assert!( - field.dtype.is_physical(), + physical_field.dtype.is_physical(), "Can only construct DataArray for PhysicalTypes, got {}", - field.dtype + physical_field.dtype ); - if let Ok(arrow_dtype) = field.dtype.to_physical().to_arrow() { - if !arrow_dtype.eq(data.data_type()) { + if let Ok(expected_arrow_physical_type) = physical_field.dtype.to_arrow() { + let arrow_data_type = arrow_array.data_type(); + + if &expected_arrow_physical_type != arrow_data_type { panic!( - "expected {:?}, got {:?} when creating a new DataArray", - arrow_dtype, - data.data_type() - ) + "Mismatch between expected and actual Arrow types for DataArray.\n\ + Field name: {}\n\ + Logical type: {}\n\ + Physical type: {}\n\ + Expected Arrow physical type: {:?}\n\ + Actual Arrow Logical type: {:?} + + This error typically occurs when there's a discrepancy between the Daft DataType \ + and the underlying Arrow representation. Please ensure that the physical type \ + of the Daft DataType matches the Arrow type of the provided data.", + physical_field.name, + physical_field.dtype, + physical_field.dtype.to_physical(), + expected_arrow_physical_type, + arrow_data_type + ); } } Ok(Self { - field, - data, + field: physical_field, + data: arrow_array, marker_: PhantomData, }) } diff --git a/src/daft-core/src/array/ops/arrow2/comparison.rs b/src/daft-core/src/array/ops/arrow2/comparison.rs index 37f7b2a37b..700ab4f8d0 100644 --- a/src/daft-core/src/array/ops/arrow2/comparison.rs +++ b/src/daft-core/src/array/ops/arrow2/comparison.rs @@ -49,7 +49,7 @@ fn build_is_equal_with_nan( } } -fn build_is_equal( +pub fn build_is_equal( left: &dyn Array, right: &dyn Array, nulls_equal: bool, diff --git a/src/daft-core/src/array/ops/cast.rs b/src/daft-core/src/array/ops/cast.rs index c3dbe0c209..83af9605b0 100644 --- a/src/daft-core/src/array/ops/cast.rs +++ b/src/daft-core/src/array/ops/cast.rs @@ -2091,7 +2091,7 @@ impl ListArray { } } } - DataType::Map(..) => Ok(MapArray::new( + DataType::Map { .. } => Ok(MapArray::new( Field::new(self.name(), dtype.clone()), self.clone(), ) @@ -2198,7 +2198,10 @@ where { Python::with_gil(|py| { let arrow_dtype = array.data_type().to_arrow()?; - let arrow_array = array.as_arrow().to_type(arrow_dtype).with_validity(None); + let arrow_array = array + .as_arrow() + .convert_logical_type(arrow_dtype) + .with_validity(None); let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; let py_array: Vec = ffi::to_py_array(py, arrow_array.to_boxed(), &pyarrow)? .call_method0(pyo3::intern!(py, "to_pylist"))? diff --git a/src/daft-core/src/array/ops/from_arrow.rs b/src/daft-core/src/array/ops/from_arrow.rs index 1739b524a9..adb339fcb2 100644 --- a/src/daft-core/src/array/ops/from_arrow.rs +++ b/src/daft-core/src/array/ops/from_arrow.rs @@ -30,17 +30,14 @@ where ::ArrayType: FromArrow, { fn from_arrow(field: FieldRef, arrow_arr: Box) -> DaftResult { - let data_array_field = Arc::new(Field::new(field.name.clone(), field.dtype.to_physical())); - let physical_arrow_arr = match field.dtype { - // TODO: Consolidate Map to use the same .to_type conversion as other logical types - // Currently, .to_type does not work for Map in Arrow2 because it requires physical types to be equivalent, - // but the physical type of MapArray in Arrow2 is a MapArray, not a ListArray - DataType::Map(..) => arrow_arr, - _ => arrow_arr.to_type(data_array_field.dtype.to_arrow()?), - }; + let target_convert = field.to_physical(); + let target_convert_arrow = target_convert.dtype.to_arrow()?; + + let physical_arrow_array = arrow_arr.convert_logical_type(target_convert_arrow.clone()); + let physical = ::ArrayType::from_arrow( - data_array_field, - physical_arrow_arr, + Arc::new(target_convert), + physical_arrow_array, )?; Ok(Self::new(field.clone(), physical)) } @@ -69,8 +66,14 @@ impl FromArrow for FixedSizeListArray { } impl FromArrow for ListArray { - fn from_arrow(field: FieldRef, arrow_arr: Box) -> DaftResult { - match (&field.dtype, arrow_arr.data_type()) { + fn from_arrow( + target_field: FieldRef, + arrow_arr: Box, + ) -> DaftResult { + let target_dtype = &target_field.dtype; + let arrow_dtype = arrow_arr.data_type(); + + let result = match (target_dtype, arrow_dtype) { ( DataType::List(daft_child_dtype), arrow2::datatypes::DataType::List(arrow_child_field), @@ -79,47 +82,40 @@ impl FromArrow for ListArray { DataType::List(daft_child_dtype), arrow2::datatypes::DataType::LargeList(arrow_child_field), ) => { - let arrow_arr = arrow_arr.to_type(arrow2::datatypes::DataType::LargeList( - arrow_child_field.clone(), - )); + // unifying lists + let arrow_arr = arrow_arr.convert_logical_type( + arrow2::datatypes::DataType::LargeList(arrow_child_field.clone()), + ); + let arrow_arr = arrow_arr .as_any() - .downcast_ref::>() + .downcast_ref::>() // list array with i64 offsets .unwrap(); + let arrow_child_array = arrow_arr.values(); let child_series = Series::from_arrow( Arc::new(Field::new("list", daft_child_dtype.as_ref().clone())), arrow_child_array.clone(), )?; Ok(Self::new( - field.clone(), + target_field.clone(), child_series, arrow_arr.offsets().clone(), arrow_arr.validity().cloned(), )) } - (DataType::List(daft_child_dtype), arrow2::datatypes::DataType::Map(..)) => { - let map_arr = arrow_arr - .as_any() - .downcast_ref::() - .unwrap(); - let arrow_child_array = map_arr.field(); - let child_series = Series::from_arrow( - Arc::new(Field::new("map", daft_child_dtype.as_ref().clone())), - arrow_child_array.clone(), - )?; - Ok(Self::new( - field.clone(), - child_series, - map_arr.offsets().into(), - arrow_arr.validity().cloned(), - )) + (DataType::List(daft_child_dtype), arrow2::datatypes::DataType::Map { .. }) => { + Err(DaftError::TypeError(format!( + "Arrow Map type should be converted to Daft Map type, not List. Attempted to create Daft ListArray with type {daft_child_dtype} from Arrow Map type.", + ))) } (d, a) => Err(DaftError::TypeError(format!( "Attempting to create Daft ListArray with type {} from arrow array with type {:?}", d, a ))), - } + }?; + + Ok(result) } } @@ -128,7 +124,7 @@ impl FromArrow for StructArray { match (&field.dtype, arrow_arr.data_type()) { (DataType::Struct(fields), arrow2::datatypes::DataType::Struct(arrow_fields)) => { if fields.len() != arrow_fields.len() { - return Err(DaftError::ValueError(format!("Attempting to create Daft StructArray with {} fields from Arrow array with {} fields: {} vs {:?}", fields.len(), arrow_fields.len(), &field.dtype, arrow_arr.data_type()))) + return Err(DaftError::ValueError(format!("Attempting to create Daft StructArray with {} fields from Arrow array with {} fields: {} vs {:?}", fields.len(), arrow_fields.len(), &field.dtype, arrow_arr.data_type()))); } let arrow_arr = arrow_arr.as_ref().as_any().downcast_ref::().unwrap(); @@ -143,7 +139,7 @@ impl FromArrow for StructArray { child_series, arrow_arr.validity().cloned(), )) - }, + } (d, a) => Err(DaftError::TypeError(format!("Attempting to create Daft StructArray with type {} from arrow array with type {:?}", d, a))) } } diff --git a/src/daft-core/src/array/ops/groups.rs b/src/daft-core/src/array/ops/groups.rs index 9676ef3a52..870c4d26bc 100644 --- a/src/daft-core/src/array/ops/groups.rs +++ b/src/daft-core/src/array/ops/groups.rs @@ -37,7 +37,7 @@ use crate::{ fn make_groups(iter: impl Iterator) -> DaftResult where T: Hash, - T: std::cmp::Eq, + T: Eq, { const DEFAULT_SIZE: usize = 256; let mut tbl = FnvHashMap::)>::with_capacity_and_hasher( @@ -56,15 +56,15 @@ where } } } - let mut s_indices = Vec::with_capacity(tbl.len()); - let mut g_indices = Vec::with_capacity(tbl.len()); + let mut sample_indices = Vec::with_capacity(tbl.len()); + let mut group_indices = Vec::with_capacity(tbl.len()); - for (s_idx, g_idx) in tbl.into_values() { - s_indices.push(s_idx); - g_indices.push(g_idx); + for (sample_index, group_index) in tbl.into_values() { + sample_indices.push(sample_index); + group_indices.push(group_index); } - Ok((s_indices, g_indices)) + Ok((sample_indices, group_indices)) } impl IntoGroups for DataArray diff --git a/src/daft-core/src/array/ops/list.rs b/src/daft-core/src/array/ops/list.rs index 4dd8cee2a8..82cbd7a5de 100644 --- a/src/daft-core/src/array/ops/list.rs +++ b/src/daft-core/src/array/ops/list.rs @@ -2,16 +2,24 @@ use std::{iter::repeat, sync::Arc}; use arrow2::offset::OffsetsBuffer; use common_error::DaftResult; +use indexmap::{ + map::{raw_entry_v1::RawEntryMut, RawEntryApiV1}, + IndexMap, +}; use super::as_arrow::AsArrow; use crate::{ array::{ growable::{make_growable, Growable}, - FixedSizeListArray, ListArray, + ops::arrow2::comparison::build_is_equal, + FixedSizeListArray, ListArray, StructArray, }, count_mode::CountMode, datatypes::{BooleanArray, DataType, Field, Int64Array, UInt64Array, Utf8Array}, + kernels::search_sorted::build_is_valid, + prelude::MapArray, series::{IntoSeries, Series}, + utils::identity_hash_set::IdentityBuildHasher, }; fn join_arrow_list_of_utf8s( @@ -244,6 +252,134 @@ fn list_sort_helper_fixed_size( } impl ListArray { + pub fn value_counts(&self) -> DaftResult { + struct IndexRef { + index: usize, + hash: u64, + } + + impl std::hash::Hash for IndexRef { + fn hash(&self, state: &mut H) { + self.hash.hash(state); + } + } + + let original_name = self.name(); + + let hashes = self.flat_child.hash(None)?; + + let flat_child = self.flat_child.to_arrow(); + let flat_child = &*flat_child; + + let is_equal = build_is_equal( + flat_child, flat_child, + false, // this value does not matter; invalid (= nulls) are never included + true, // NaNs are equal so we do not get a bunch of {Nan: 1, Nan: 1, ...} + )?; + + let is_valid = build_is_valid(flat_child); + + let key_type = self.flat_child.data_type().clone(); + let count_type = DataType::UInt64; + + let mut include_mask = Vec::with_capacity(self.flat_child.len()); + let mut count_array = Vec::new(); + + let mut offsets = Vec::with_capacity(self.len()); + + offsets.push(0_i64); + + let mut map: IndexMap = IndexMap::default(); + for range in self.offsets().ranges() { + map.clear(); + + for index in range { + let index = index as usize; + if !is_valid(index) { + include_mask.push(false); + // skip nulls + continue; + } + + let hash = hashes.get(index).unwrap(); + + let entry = map + .raw_entry_mut_v1() + .from_hash(hash, |other| is_equal(other.index, index)); + + match entry { + RawEntryMut::Occupied(mut entry) => { + include_mask.push(false); + *entry.get_mut() += 1; + } + RawEntryMut::Vacant(vacant) => { + include_mask.push(true); + vacant.insert(IndexRef { hash, index }, 1); + } + } + } + + // IndexMap maintains insertion order, so we iterate over its values + // in the same order that elements were added. This ensures that + // the count_array values correspond to the same order in which + // the include_mask was set earlier in the loop. Each 'true' in + // include_mask represents a unique key, and its corresponding + // count is now added to count_array in the same sequence. + for v in map.values() { + count_array.push(*v); + } + + offsets.push(count_array.len() as i64); + } + + let values = UInt64Array::from(("count", count_array)).into_series(); + let include_mask = BooleanArray::from(("boolean", include_mask.as_slice())); + + let keys = self.flat_child.filter(&include_mask)?; + + let keys = Series::try_from_field_and_arrow_array( + Field::new("key", key_type.clone()), + keys.to_arrow(), + )?; + + let values = Series::try_from_field_and_arrow_array( + Field::new("value", count_type.clone()), + values.to_arrow(), + )?; + + let struct_type = DataType::Struct(vec![ + Field::new("key", key_type.clone()), + Field::new("value", count_type.clone()), + ]); + + let struct_array = StructArray::new( + Arc::new(Field::new("entries", struct_type.clone())), + vec![keys, values], + None, + ); + + let list_type = DataType::List(Box::new(struct_type)); + + let offsets = OffsetsBuffer::try_from(offsets)?; + + let list_array = Self::new( + Arc::new(Field::new("entries", list_type.clone())), + struct_array.into_series(), + offsets, + None, + ); + + let map_type = DataType::Map { + key: Box::new(key_type), + value: Box::new(count_type), + }; + + Ok(MapArray::new( + Field::new(original_name, map_type.clone()), + list_array, + )) + } + pub fn count(&self, mode: CountMode) -> DaftResult { let counts = match (mode, self.flat_child.validity()) { (CountMode::All, _) | (CountMode::Valid, None) => { @@ -472,6 +608,11 @@ impl ListArray { } impl FixedSizeListArray { + pub fn value_counts(&self) -> DaftResult { + let list = self.to_list(); + list.value_counts() + } + pub fn count(&self, mode: CountMode) -> DaftResult { let size = self.fixed_element_len(); let counts = match (mode, self.flat_child.validity()) { diff --git a/src/daft-core/src/array/ops/map.rs b/src/daft-core/src/array/ops/map.rs index 3b2f6ffd8c..c9daafe2c4 100644 --- a/src/daft-core/src/array/ops/map.rs +++ b/src/daft-core/src/array/ops/map.rs @@ -1,4 +1,5 @@ use common_error::{DaftError, DaftResult}; +use itertools::Itertools; use crate::{ array::{ops::DaftCompare, prelude::*}, @@ -6,13 +7,21 @@ use crate::{ series::Series, }; -fn single_map_get(structs: &Series, key_to_get: &Series) -> DaftResult { +fn single_map_get( + structs: &Series, + key_to_get: &Series, + coerce_value: &DataType, +) -> DaftResult { let (keys, values) = { let struct_array = structs.struct_()?; (struct_array.get("key")?, struct_array.get("value")?) }; + let mask = keys.equal(key_to_get)?; let filtered = values.filter(&mask)?; + + let filtered = filtered.cast(coerce_value)?; + if filtered.is_empty() { Ok(Series::full_null("value", values.data_type(), 1)) } else if filtered.len() == 1 { @@ -24,19 +33,10 @@ fn single_map_get(structs: &Series, key_to_get: &Series) -> DaftResult { impl MapArray { pub fn map_get(&self, key_to_get: &Series) -> DaftResult { - let value_type = if let DataType::Map(inner_dtype) = self.data_type() { - match *inner_dtype.clone() { - DataType::Struct(fields) if fields.len() == 2 => { - fields[1].dtype.clone() - } - _ => { - return Err(DaftError::TypeError(format!( - "Expected inner type to be a struct type with two fields: key and value, got {:?}", - inner_dtype - ))) - } - } - } else { + let DataType::Map { + value: value_type, .. + } = self.data_type() + else { return Err(DaftError::TypeError(format!( "Expected input to be a map type, got {:?}", self.data_type() @@ -44,30 +44,49 @@ impl MapArray { }; match key_to_get.len() { - 1 => { - let mut result = Vec::with_capacity(self.len()); - for series in self.physical.into_iter() { - match series { - Some(s) if !s.is_empty() => result.push(single_map_get(&s, key_to_get)?), - _ => result.push(Series::full_null("value", &value_type, 1)), - } - } - Series::concat(&result.iter().collect::>()) - } - len if len == self.len() => { - let mut result = Vec::with_capacity(len); - for (i, series) in self.physical.into_iter().enumerate() { - match (series, key_to_get.slice(i, i + 1)?) { - (Some(s), k) if !s.is_empty() => result.push(single_map_get(&s, &k)?), - _ => result.push(Series::full_null("value", &value_type, 1)), - } - } - Series::concat(&result.iter().collect::>()) - } + 1 => self.get_single_key(key_to_get, value_type), + len if len == self.len() => self.get_multiple_keys(key_to_get, value_type), _ => Err(DaftError::ValueError(format!( "Expected key to have length 1 or length equal to the map length, got {}", key_to_get.len() ))), } } + + fn get_single_key(&self, key_to_get: &Series, coerce_value: &DataType) -> DaftResult { + let result: Vec<_> = self + .physical + .iter() + .map(|series| match series { + Some(s) if !s.is_empty() => single_map_get(&s, key_to_get, coerce_value), + _ => Ok(Series::full_null("value", coerce_value, 1)), + }) + .try_collect()?; + + let result: Vec<_> = result.iter().collect(); + + Series::concat(&result) + } + + fn get_multiple_keys( + &self, + key_to_get: &Series, + coerce_value: &DataType, + ) -> DaftResult { + let result: Vec<_> = self + .physical + .iter() + .enumerate() + .map(|(i, series)| match series { + Some(s) if !s.is_empty() => { + single_map_get(&s, &key_to_get.slice(i, i + 1)?, coerce_value) + } + _ => Ok(Series::full_null("value", coerce_value, 1)), + }) + .try_collect()?; + + let result: Vec<_> = result.iter().collect(); + + Series::concat(&result) + } } diff --git a/src/daft-core/src/array/ops/sparse_tensor.rs b/src/daft-core/src/array/ops/sparse_tensor.rs index 696a5996b8..e906551bb2 100644 --- a/src/daft-core/src/array/ops/sparse_tensor.rs +++ b/src/daft-core/src/array/ops/sparse_tensor.rs @@ -63,6 +63,7 @@ mod tests { Some(validity.clone()), ) .into_series(); + let indices_array = ListArray::new( Field::new("indices", DataType::List(Box::new(DataType::UInt64))), UInt64Array::from(( @@ -90,6 +91,7 @@ mod tests { Some(validity.clone()), ) .into_series(); + let dtype = DataType::SparseTensor(Box::new(DataType::Int64)); let struct_array = StructArray::new( Field::new("tensor", dtype.to_physical()), @@ -103,9 +105,12 @@ mod tests { let fixed_shape_sparse_tensor_array = sparse_tensor_array.cast(&fixed_shape_sparse_tensor_dtype)?; let roundtrip_tensor = fixed_shape_sparse_tensor_array.cast(&dtype)?; - assert!(roundtrip_tensor - .to_arrow() - .eq(&sparse_tensor_array.to_arrow())); + + let round_trip_tensor_arrow = roundtrip_tensor.to_arrow(); + let sparse_tensor_array_arrow = sparse_tensor_array.to_arrow(); + + assert_eq!(round_trip_tensor_arrow, sparse_tensor_array_arrow); + Ok(()) } } diff --git a/src/daft-core/src/array/serdes.rs b/src/daft-core/src/array/serdes.rs index cc908c0dd6..0976f53a0a 100644 --- a/src/daft-core/src/array/serdes.rs +++ b/src/daft-core/src/array/serdes.rs @@ -130,7 +130,11 @@ impl serde::Serialize for ExtensionArray { let mut s = serializer.serialize_map(Some(2))?; s.serialize_entry("field", self.field())?; let values = if let DataType::Extension(_, inner, _) = self.data_type() { - Series::try_from(("physical", self.data.to_type(inner.to_arrow().unwrap()))).unwrap() + Series::try_from(( + "physical", + self.data.convert_logical_type(inner.to_arrow().unwrap()), + )) + .unwrap() } else { panic!("Expected Extension Type!") }; diff --git a/src/daft-core/src/array/struct_array.rs b/src/daft-core/src/array/struct_array.rs index 996680ede5..8a228735e4 100644 --- a/src/daft-core/src/array/struct_array.rs +++ b/src/daft-core/src/array/struct_array.rs @@ -11,6 +11,8 @@ use crate::{ #[derive(Clone, Debug)] pub struct StructArray { pub field: Arc, + + /// Column representations pub children: Vec, validity: Option, len: usize, diff --git a/src/daft-core/src/datatypes/logical.rs b/src/daft-core/src/datatypes/logical.rs index 86d84535e1..9704b3b76f 100644 --- a/src/daft-core/src/datatypes/logical.rs +++ b/src/daft-core/src/datatypes/logical.rs @@ -44,6 +44,7 @@ impl LogicalArrayImpl { &field.dtype.to_physical(), physical.data_type() ); + Self { physical, field, diff --git a/src/daft-core/src/datatypes/matching.rs b/src/daft-core/src/datatypes/matching.rs index b8b8e1660f..c275bb4a2d 100644 --- a/src/daft-core/src/datatypes/matching.rs +++ b/src/daft-core/src/datatypes/matching.rs @@ -8,43 +8,43 @@ macro_rules! with_match_daft_types {( use $crate::datatypes::*; match $key_type { - Null => __with_ty__! { NullType }, + // Float16 => unimplemented!("Array for Float16 DataType not implemented"), + Binary => __with_ty__! { BinaryType }, Boolean => __with_ty__! { BooleanType }, - Int8 => __with_ty__! { Int8Type }, + Date => __with_ty__! { DateType }, + Decimal128(..) => __with_ty__! { Decimal128Type }, + Duration(_) => __with_ty__! { DurationType }, + Embedding(..) => __with_ty__! { EmbeddingType }, + Extension(_, _, _) => __with_ty__! { ExtensionType }, + FixedShapeImage(..) => __with_ty__! { FixedShapeImageType }, + FixedShapeSparseTensor(..) => __with_ty__! { FixedShapeSparseTensorType }, + FixedShapeTensor(..) => __with_ty__! { FixedShapeTensorType }, + FixedSizeBinary(_) => __with_ty__! { FixedSizeBinaryType }, + FixedSizeList(_, _) => __with_ty__! { FixedSizeListType }, + Float32 => __with_ty__! { Float32Type }, + Float64 => __with_ty__! { Float64Type }, + Image(..) => __with_ty__! { ImageType }, + Int128 => __with_ty__! { Int128Type }, Int16 => __with_ty__! { Int16Type }, Int32 => __with_ty__! { Int32Type }, Int64 => __with_ty__! { Int64Type }, - Int128 => __with_ty__! { Int128Type }, - UInt8 => __with_ty__! { UInt8Type }, + Int8 => __with_ty__! { Int8Type }, + List(_) => __with_ty__! { ListType }, + Map{..} => __with_ty__! { MapType }, + Null => __with_ty__! { NullType }, + SparseTensor(..) => __with_ty__! { SparseTensorType }, + Struct(_) => __with_ty__! { StructType }, + Tensor(..) => __with_ty__! { TensorType }, + Time(_) => __with_ty__! { TimeType }, + Timestamp(_, _) => __with_ty__! { TimestampType }, UInt16 => __with_ty__! { UInt16Type }, UInt32 => __with_ty__! { UInt32Type }, UInt64 => __with_ty__! { UInt64Type }, - Float32 => __with_ty__! { Float32Type }, - Float64 => __with_ty__! { Float64Type }, - Timestamp(_, _) => __with_ty__! { TimestampType }, - Date => __with_ty__! { DateType }, - Time(_) => __with_ty__! { TimeType }, - Duration(_) => __with_ty__! { DurationType }, - Binary => __with_ty__! { BinaryType }, - FixedSizeBinary(_) => __with_ty__! { FixedSizeBinaryType }, + UInt8 => __with_ty__! { UInt8Type }, + Unknown => unimplemented!("Array for Unknown DataType not implemented"), Utf8 => __with_ty__! { Utf8Type }, - FixedSizeList(_, _) => __with_ty__! { FixedSizeListType }, - List(_) => __with_ty__! { ListType }, - Struct(_) => __with_ty__! { StructType }, - Map(_) => __with_ty__! { MapType }, - Extension(_, _, _) => __with_ty__! { ExtensionType }, #[cfg(feature = "python")] Python => __with_ty__! { PythonType }, - Embedding(..) => __with_ty__! { EmbeddingType }, - Image(..) => __with_ty__! { ImageType }, - FixedShapeImage(..) => __with_ty__! { FixedShapeImageType }, - Tensor(..) => __with_ty__! { TensorType }, - FixedShapeTensor(..) => __with_ty__! { FixedShapeTensorType }, - SparseTensor(..) => __with_ty__! { SparseTensorType }, - FixedShapeSparseTensor(..) => __with_ty__! { FixedShapeSparseTensorType }, - Decimal128(..) => __with_ty__! { Decimal128Type }, - // Float16 => unimplemented!("Array for Float16 DataType not implemented"), - Unknown => unimplemented!("Array for Unknown DataType not implemented"), // NOTE: We should not implement a default for match here, because this is meant to be // an exhaustive match across **all** Daft types. diff --git a/src/daft-core/src/lib.rs b/src/daft-core/src/lib.rs index 322a0db3ec..5892f75ffb 100644 --- a/src/daft-core/src/lib.rs +++ b/src/daft-core/src/lib.rs @@ -2,6 +2,7 @@ #![feature(int_roundings)] #![feature(iterator_try_reduce)] #![feature(if_let_guard)] +#![feature(hash_raw_entry)] pub mod array; pub mod count_mode; diff --git a/src/daft-core/src/series/from.rs b/src/daft-core/src/series/from.rs index 99776edf64..fb30db3a93 100644 --- a/src/daft-core/src/series/from.rs +++ b/src/daft-core/src/series/from.rs @@ -1,6 +1,8 @@ use std::sync::Arc; +use arrow2::datatypes::ArrowDataType; use common_error::{DaftError, DaftResult}; +use daft_schema::{dtype::DaftDataType, field::DaftField}; use super::Series; use crate::{ @@ -12,9 +14,10 @@ use crate::{ impl Series { pub fn try_from_field_and_arrow_array( - field: Arc, + field: impl Into>, array: Box, ) -> DaftResult { + let field = field.into(); // TODO(Nested): Refactor this out with nested logical types in StructArray and ListArray // Corner-case nested logical types that have not yet been migrated to new Array formats // to hold only casted physical arrow arrays. @@ -46,11 +49,90 @@ impl Series { impl TryFrom<(&str, Box)> for Series { type Error = DaftError; - fn try_from(item: (&str, Box)) -> DaftResult { - let (name, array) = item; - let source_arrow_type = array.data_type(); - let dtype: DataType = source_arrow_type.into(); + fn try_from((name, array): (&str, Box)) -> DaftResult { + let source_arrow_type: &ArrowDataType = array.data_type(); + let dtype = DaftDataType::from(source_arrow_type); + let field = Arc::new(Field::new(name, dtype.clone())); Self::try_from_field_and_arrow_array(field, array) } } + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use arrow2::{ + array::Array, + datatypes::{ArrowDataType, ArrowField}, + }; + use common_error::DaftResult; + use daft_schema::dtype::DataType; + + static ARROW_DATA_TYPE: LazyLock = LazyLock::new(|| { + ArrowDataType::Map( + Box::new(ArrowField::new( + "entries", + ArrowDataType::Struct(vec![ + ArrowField::new("key", ArrowDataType::LargeUtf8, false), + ArrowField::new("value", ArrowDataType::Date32, true), + ]), + false, + )), + false, + ) + }); + + #[test] + fn test_map_type_conversion() { + let arrow_data_type = ARROW_DATA_TYPE.clone(); + let dtype = DataType::from(&arrow_data_type); + assert_eq!( + dtype, + DataType::Map { + key: Box::new(DataType::Utf8), + value: Box::new(DataType::Date), + }, + ) + } + + #[test] + fn test_map_array_conversion() -> DaftResult<()> { + use arrow2::array::MapArray; + + use super::*; + + let arrow_array = MapArray::new( + ARROW_DATA_TYPE.clone(), + vec![0, 1].try_into().unwrap(), + Box::new(arrow2::array::StructArray::new( + ArrowDataType::Struct(vec![ + ArrowField::new("key", ArrowDataType::LargeUtf8, false), + ArrowField::new("value", ArrowDataType::Date32, true), + ]), + vec![ + Box::new(arrow2::array::Utf8Array::::from_slice(["key1"])), + arrow2::array::Int32Array::from_slice([1]) + .convert_logical_type(ArrowDataType::Date32), + ], + None, + )), + None, + ); + + let series = Series::try_from(( + "test_map", + Box::new(arrow_array) as Box, + ))?; + + assert_eq!( + series.data_type(), + &DataType::Map { + key: Box::new(DataType::Utf8), + value: Box::new(DataType::Date), + } + ); + + Ok(()) + } +} diff --git a/src/daft-core/src/series/mod.rs b/src/daft-core/src/series/mod.rs index 276fdfde87..5dfbd26502 100644 --- a/src/daft-core/src/series/mod.rs +++ b/src/daft-core/src/series/mod.rs @@ -123,6 +123,13 @@ impl Series { self.inner.validity() } + pub fn is_valid(&self, idx: usize) -> bool { + let Some(validity) = self.validity() else { + return true; + }; + validity.get_bit(idx) + } + /// Attempts to downcast the Series to a primitive slice /// This will return an error if the Series is not of the physical type `T` /// # Example diff --git a/src/daft-core/src/series/ops/concat.rs b/src/daft-core/src/series/ops/concat.rs index 9103255faf..94e275fc75 100644 --- a/src/daft-core/src/series/ops/concat.rs +++ b/src/daft-core/src/series/ops/concat.rs @@ -7,30 +7,29 @@ use crate::{ impl Series { pub fn concat(series: &[&Self]) -> DaftResult { - if series.is_empty() { - return Err(DaftError::ValueError( - "Need at least 1 series to perform concat".to_string(), - )); - } + let all_types: Vec<_> = series.iter().map(|s| s.data_type().clone()).collect(); - if series.len() == 1 { - return Ok((*series.first().unwrap()).clone()); - } + match series { + [] => Err(DaftError::ValueError( + "Need at least 1 series to perform concat".to_string(), + )), + [single_series] => Ok((*single_series).clone()), + [first, rest @ ..] => { + let first_dtype = first.data_type(); + for s in rest.iter() { + if first_dtype != s.data_type() { + return Err(DaftError::TypeError(format!( + "Series concat requires all data types to match. Found mismatched types. All types: {:?}", + all_types + ))); + } + } - let first_dtype = series.first().unwrap().data_type(); - for s in series.iter().skip(1) { - if first_dtype != s.data_type() { - return Err(DaftError::TypeError(format!( - "Series concat requires all data types to match, {} vs {}", - first_dtype, - s.data_type() - ))); + with_match_daft_types!(first_dtype, |$T| { + let downcasted = series.into_iter().map(|s| s.downcast::<<$T as DaftDataType>::ArrayType>()).collect::>>()?; + Ok(<$T as DaftDataType>::ArrayType::concat(downcasted.as_slice())?.into_series()) + }) } } - - with_match_daft_types!(first_dtype, |$T| { - let downcasted = series.into_iter().map(|s| s.downcast::<<$T as DaftDataType>::ArrayType>()).collect::>>()?; - Ok(<$T as DaftDataType>::ArrayType::concat(downcasted.as_slice())?.into_series()) - }) } } diff --git a/src/daft-core/src/series/ops/list.rs b/src/daft-core/src/series/ops/list.rs index d9a17dd087..81a4788067 100644 --- a/src/daft-core/src/series/ops/list.rs +++ b/src/daft-core/src/series/ops/list.rs @@ -7,6 +7,22 @@ use crate::{ }; impl Series { + pub fn list_value_counts(&self) -> DaftResult { + let series = match self.data_type() { + DataType::List(_) => self.list()?.value_counts(), + DataType::FixedSizeList(..) => self.fixed_size_list()?.value_counts(), + dt => { + return Err(DaftError::TypeError(format!( + "List contains not implemented for {}", + dt + ))) + } + }? + .into_series(); + + Ok(series) + } + pub fn explode(&self) -> DaftResult { match self.data_type() { DataType::List(_) => self.list()?.explode(), diff --git a/src/daft-core/src/series/ops/map.rs b/src/daft-core/src/series/ops/map.rs index b624cd8aac..d5f1452bee 100644 --- a/src/daft-core/src/series/ops/map.rs +++ b/src/daft-core/src/series/ops/map.rs @@ -4,12 +4,13 @@ use crate::{datatypes::DataType, series::Series}; impl Series { pub fn map_get(&self, key: &Self) -> DaftResult { - match self.data_type() { - DataType::Map(_) => self.map()?.map_get(key), - dt => Err(DaftError::TypeError(format!( + let DataType::Map { .. } = self.data_type() else { + return Err(DaftError::TypeError(format!( "map.get not implemented for {}", - dt - ))), - } + self.data_type() + ))); + }; + + self.map()?.map_get(key) } } diff --git a/src/daft-core/src/series/serdes.rs b/src/daft-core/src/series/serdes.rs index bf7e42a1e0..76414e30e6 100644 --- a/src/daft-core/src/series/serdes.rs +++ b/src/daft-core/src/series/serdes.rs @@ -158,12 +158,13 @@ impl<'d> serde::Deserialize<'d> for Series { DataType::Extension(..) => { let physical = map.next_value::()?; let physical = physical.to_arrow(); - let ext_array = physical.to_type(field.dtype.to_arrow().unwrap()); + let ext_array = + physical.convert_logical_type(field.dtype.to_arrow().unwrap()); Ok(ExtensionArray::new(Arc::new(field), ext_array) .unwrap() .into_series()) } - DataType::Map(..) => { + DataType::Map { .. } => { let physical = map.next_value::()?; Ok(MapArray::new( Arc::new(field), diff --git a/src/daft-dsl/src/functions/map/get.rs b/src/daft-dsl/src/functions/map/get.rs index ab6eb148f8..5465f08562 100644 --- a/src/daft-dsl/src/functions/map/get.rs +++ b/src/daft-dsl/src/functions/map/get.rs @@ -12,40 +12,36 @@ impl FunctionEvaluator for GetEvaluator { } fn to_field(&self, inputs: &[ExprRef], schema: &Schema, _: &FunctionExpr) -> DaftResult { - match inputs { - [input, key] => match (input.to_field(schema), key.to_field(schema)) { - (Ok(input_field), Ok(_)) => match input_field.dtype { - DataType::Map(inner) => match inner.as_ref() { - DataType::Struct(fields) if fields.len() == 2 => { - let value_dtype = &fields[1].dtype; - Ok(Field::new("value", value_dtype.clone())) - } - _ => Err(DaftError::TypeError(format!( - "Expected input map to have struct values with 2 fields, got {}", - inner - ))), - }, - _ => Err(DaftError::TypeError(format!( - "Expected input to be a map, got {}", - input_field.dtype - ))), - }, - (Err(e), _) | (_, Err(e)) => Err(e), - }, - _ => Err(DaftError::SchemaMismatch(format!( + let [input, key] = inputs else { + return Err(DaftError::SchemaMismatch(format!( "Expected 2 input args, got {}", inputs.len() - ))), - } + ))); + }; + + let input_field = input.to_field(schema)?; + let _ = key.to_field(schema)?; + + let DataType::Map { value, .. } = input_field.dtype else { + return Err(DaftError::TypeError(format!( + "Expected input to be a map, got {}", + input_field.dtype + ))); + }; + + let field = Field::new("value", *value); + + Ok(field) } fn evaluate(&self, inputs: &[Series], _: &FunctionExpr) -> DaftResult { - match inputs { - [input, key] => input.map_get(key), - _ => Err(DaftError::ValueError(format!( + let [input, key] = inputs else { + return Err(DaftError::ValueError(format!( "Expected 2 input args, got {}", inputs.len() - ))), - } + ))); + }; + + input.map_get(key) } } diff --git a/src/daft-functions/src/list/mod.rs b/src/daft-functions/src/list/mod.rs index 2ba3f197be..c0ad745b19 100644 --- a/src/daft-functions/src/list/mod.rs +++ b/src/daft-functions/src/list/mod.rs @@ -9,6 +9,7 @@ mod min; mod slice; mod sort; mod sum; +mod value_counts; pub use chunk::{list_chunk as chunk, ListChunk}; pub use count::{list_count as count, ListCount}; @@ -31,6 +32,10 @@ pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_function(wrap_pyfunction_bound!(count::py_list_count, parent)?)?; parent.add_function(wrap_pyfunction_bound!(get::py_list_get, parent)?)?; parent.add_function(wrap_pyfunction_bound!(join::py_list_join, parent)?)?; + parent.add_function(wrap_pyfunction_bound!( + value_counts::py_list_value_counts, + parent + )?)?; parent.add_function(wrap_pyfunction_bound!(max::py_list_max, parent)?)?; parent.add_function(wrap_pyfunction_bound!(min::py_list_min, parent)?)?; diff --git a/src/daft-functions/src/list/value_counts.rs b/src/daft-functions/src/list/value_counts.rs new file mode 100644 index 0000000000..d558db8ac4 --- /dev/null +++ b/src/daft-functions/src/list/value_counts.rs @@ -0,0 +1,72 @@ +use common_error::{DaftError, DaftResult}; +use daft_core::prelude::{DataType, Field, Schema, Series}; +#[cfg(feature = "python")] +use daft_dsl::python::PyExpr; +use daft_dsl::{ + functions::{ScalarFunction, ScalarUDF}, + ExprRef, +}; +#[cfg(feature = "python")] +use pyo3::{pyfunction, PyResult}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +struct ListValueCountsFunction; + +#[typetag::serde] +impl ScalarUDF for ListValueCountsFunction { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &'static str { + "list_value_counts" + } + + fn to_field(&self, inputs: &[ExprRef], schema: &Schema) -> DaftResult { + let [data] = inputs else { + return Err(DaftError::SchemaMismatch(format!( + "Expected 1 input arg, got {}", + inputs.len() + ))); + }; + + let data_field = data.to_field(schema)?; + + let DataType::List(inner_type) = &data_field.dtype else { + return Err(DaftError::TypeError(format!( + "Expected list, got {}", + data_field.dtype + ))); + }; + + let map_type = DataType::Map { + key: inner_type.clone(), + value: Box::new(DataType::UInt64), + }; + + Ok(Field::new(data_field.name, map_type)) + } + + fn evaluate(&self, inputs: &[Series]) -> DaftResult { + let [data] = inputs else { + return Err(DaftError::ValueError(format!( + "Expected 1 input arg, got {}", + inputs.len() + ))); + }; + + data.list_value_counts() + } +} + +pub fn list_value_counts(expr: ExprRef) -> ExprRef { + ScalarFunction::new(ListValueCountsFunction, vec![expr]).into() +} + +#[cfg(feature = "python")] +#[pyfunction] +#[pyo3(name = "list_value_counts")] +pub fn py_list_value_counts(expr: PyExpr) -> PyResult { + Ok(list_value_counts(expr.into()).into()) +} diff --git a/src/daft-micropartition/src/lib.rs b/src/daft-micropartition/src/lib.rs index 1a01f4e933..c677a0fd96 100644 --- a/src/daft-micropartition/src/lib.rs +++ b/src/daft-micropartition/src/lib.rs @@ -1,5 +1,6 @@ #![feature(let_chains)] #![feature(iterator_try_reduce)] +#![feature(iterator_try_collect)] use common_error::DaftError; use snafu::Snafu; diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 5b518419d1..2bd128a566 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -403,7 +403,7 @@ fn materialize_scan_task( .with_context(|_| PyIOSnafu) .map(Into::::into) }) - }, + } _ => unreachable!("PythonFunction file format must be paired with PythonFactoryFunction data file sources"), } }); @@ -709,9 +709,17 @@ impl MicroPartition { Ok(size_bytes) } + /// Retrieves tables from the MicroPartition, reading data if not already loaded. + /// + /// This method: + /// 1. Returns cached tables if already loaded. + /// 2. If unloaded, reads data from the source, caches it, and returns the new tables. + /// + /// "Reading if necessary" means I/O operations only occur for unloaded data, + /// optimizing performance by avoiding redundant reads. pub(crate) fn tables_or_read(&self, io_stats: IOStatsRef) -> crate::Result>> { let mut guard = self.state.lock().unwrap(); - match guard.deref() { + match &*guard { TableState::Unloaded(scan_task) => { let (tables, _) = materialize_scan_task(scan_task.clone(), Some(io_stats))?; let table_values = Arc::new(tables); diff --git a/src/daft-micropartition/src/ops/eval_expressions.rs b/src/daft-micropartition/src/ops/eval_expressions.rs index 8ac5966a2e..9b4ebc0834 100644 --- a/src/daft-micropartition/src/ops/eval_expressions.rs +++ b/src/daft-micropartition/src/ops/eval_expressions.rs @@ -33,16 +33,18 @@ impl MicroPartition { let io_stats = IOStatsContext::new("MicroPartition::eval_expression_list"); let expected_schema = infer_schema(exprs, &self.schema)?; + let tables = self.tables_or_read(io_stats)?; - let evaluated_tables = tables + + let evaluated_tables: Vec<_> = tables .iter() - .map(|t| t.eval_expression_list(exprs)) - .collect::>>()?; + .map(|table| table.eval_expression_list(exprs)) + .try_collect()?; let eval_stats = self .statistics .as_ref() - .map(|s| s.eval_expression_list(exprs, &expected_schema)) + .map(|table_statistics| table_statistics.eval_expression_list(exprs, &expected_schema)) .transpose()?; Ok(Self::new_loaded( diff --git a/src/daft-schema/src/dtype.rs b/src/daft-schema/src/dtype.rs index 65cf8f808e..1da8adc066 100644 --- a/src/daft-schema/src/dtype.rs +++ b/src/daft-schema/src/dtype.rs @@ -7,6 +7,8 @@ use serde::{Deserialize, Serialize}; use crate::{field::Field, image_mode::ImageMode, time_unit::TimeUnit}; +pub type DaftDataType = DataType; + #[derive(Clone, Debug, Display, PartialEq, Eq, Serialize, Deserialize, Hash)] pub enum DataType { // ArrowTypes: @@ -107,8 +109,11 @@ pub enum DataType { Struct(Vec), /// A nested [`DataType`] that is represented as List>. - #[display("Map[{_0}]")] - Map(Box), + #[display("Map[{key}: {value}]")] + Map { + key: Box, + value: Box, + }, /// Extension type. #[display("{_1}")] @@ -233,14 +238,30 @@ impl DataType { Self::List(field) => Ok(ArrowType::LargeList(Box::new( arrow2::datatypes::Field::new("item", field.to_arrow()?, true), ))), - Self::Map(field) => Ok(ArrowType::Map( - Box::new(arrow2::datatypes::Field::new( - "item", - field.to_arrow()?, - true, - )), - false, - )), + Self::Map { key, value } => { + let struct_type = ArrowType::Struct(vec![ + // We never allow null keys in maps for several reasons: + // 1. Null typically represents the absence of a value, which doesn't make sense for a key. + // 2. Null comparisons can be problematic (similar to how f64::NAN != f64::NAN). + // 3. It maintains consistency with common map implementations in arrow (no null keys). + // 4. It simplifies map operations + // + // This decision aligns with the thoughts of team members like Jay and Sammy, who argue that: + // - Nulls in keys could lead to unintuitive behavior + // - If users need to count or group by null values, they can use other constructs like + // group_by operations on non-map types, which offer more explicit control. + // + // By disallowing null keys, we encourage more robust data modeling practices and + // provide a clearer semantic meaning for map types in our system. + arrow2::datatypes::Field::new("key", key.to_arrow()?, true), + arrow2::datatypes::Field::new("value", value.to_arrow()?, true), + ]); + + let struct_field = + arrow2::datatypes::Field::new("entries", struct_type.clone(), true); + + Ok(ArrowType::map(struct_field, false)) + } Self::Struct(fields) => Ok({ let fields = fields .iter() @@ -288,7 +309,10 @@ impl DataType { FixedSizeList(child_dtype, size) => { FixedSizeList(Box::new(child_dtype.to_physical()), *size) } - Map(child_dtype) => List(Box::new(child_dtype.to_physical())), + Map { key, value } => List(Box::new(Struct(vec![ + Field::new("key", key.to_physical()), + Field::new("value", value.to_physical()), + ]))), Embedding(dtype, size) => FixedSizeList(Box::new(dtype.to_physical()), *size), Image(mode) => Struct(vec![ Field::new( @@ -328,20 +352,6 @@ impl DataType { } } - #[inline] - pub fn nested_dtype(&self) -> Option<&Self> { - match self { - Self::Map(dtype) - | Self::List(dtype) - | Self::FixedSizeList(dtype, _) - | Self::FixedShapeTensor(dtype, _) - | Self::SparseTensor(dtype) - | Self::FixedShapeSparseTensor(dtype, _) - | Self::Tensor(dtype) => Some(dtype), - _ => None, - } - } - #[inline] pub fn is_arrow(&self) -> bool { self.to_arrow().is_ok() @@ -350,21 +360,21 @@ impl DataType { #[inline] pub fn is_numeric(&self) -> bool { match self { - Self::Int8 - | Self::Int16 - | Self::Int32 - | Self::Int64 - | Self::Int128 - | Self::UInt8 - | Self::UInt16 - | Self::UInt32 - | Self::UInt64 - // DataType::Float16 - | Self::Float32 - | Self::Float64 => true, - Self::Extension(_, inner, _) => inner.is_numeric(), - _ => false - } + Self::Int8 + | Self::Int16 + | Self::Int32 + | Self::Int64 + | Self::Int128 + | Self::UInt8 + | Self::UInt16 + | Self::UInt32 + | Self::UInt64 + // DataType::Float16 + | Self::Float32 + | Self::Float64 => true, + Self::Extension(_, inner, _) => inner.is_numeric(), + _ => false + } } #[inline] @@ -453,7 +463,7 @@ impl DataType { #[inline] pub fn is_map(&self) -> bool { - matches!(self, Self::Map(..)) + matches!(self, Self::Map { .. }) } #[inline] @@ -576,7 +586,7 @@ impl DataType { | Self::FixedShapeTensor(..) | Self::SparseTensor(..) | Self::FixedShapeSparseTensor(..) - | Self::Map(..) + | Self::Map { .. } ) } @@ -590,7 +600,7 @@ impl DataType { let p: Self = self.to_physical(); matches!( p, - Self::List(..) | Self::FixedSizeList(..) | Self::Struct(..) | Self::Map(..) + Self::List(..) | Self::FixedSizeList(..) | Self::Struct(..) | Self::Map { .. } ) } @@ -607,7 +617,7 @@ impl DataType { impl From<&ArrowType> for DataType { fn from(item: &ArrowType) -> Self { - match item { + let result = match item { ArrowType::Null => Self::Null, ArrowType::Boolean => Self::Boolean, ArrowType::Int8 => Self::Int8, @@ -638,7 +648,29 @@ impl From<&ArrowType> for DataType { ArrowType::FixedSizeList(field, size) => { Self::FixedSizeList(Box::new(field.as_ref().data_type().into()), *size) } - ArrowType::Map(field, ..) => Self::Map(Box::new(field.as_ref().data_type().into())), + ArrowType::Map(field, ..) => { + // todo: TryFrom in future? want in second pass maybe + + // field should be a struct + let ArrowType::Struct(fields) = &field.data_type else { + panic!("Map should have a struct as its key") + }; + + let [key, value] = fields.as_slice() else { + panic!("Map should have two fields") + }; + + let key = &key.data_type; + let value = &value.data_type; + + let key = Self::from(key); + let value = Self::from(value); + + let key = Box::new(key); + let value = Box::new(value); + + Self::Map { key, value } + } ArrowType::Struct(fields) => { let fields: Vec = fields.iter().map(|fld| fld.into()).collect(); Self::Struct(fields) @@ -659,7 +691,9 @@ impl From<&ArrowType> for DataType { } _ => panic!("DataType :{item:?} is not supported"), - } + }; + + result } } diff --git a/src/daft-schema/src/field.rs b/src/daft-schema/src/field.rs index 774545fee4..f4cd6ecb16 100644 --- a/src/daft-schema/src/field.rs +++ b/src/daft-schema/src/field.rs @@ -18,6 +18,7 @@ pub struct Field { } pub type FieldRef = Arc; +pub type DaftField = Field; #[derive(Clone, Display, Debug, PartialEq, Eq, Deserialize, Serialize, Hash)] #[display("{id}")] @@ -87,6 +88,14 @@ impl Field { ) } + pub fn to_physical(&self) -> Self { + Self { + name: self.name.clone(), + dtype: self.dtype.to_physical(), + metadata: self.metadata.clone(), + } + } + pub fn rename>(&self, name: S) -> Self { Self { name: name.into(), diff --git a/src/daft-schema/src/python/datatype.rs b/src/daft-schema/src/python/datatype.rs index ceff5e18f3..edacbfbdad 100644 --- a/src/daft-schema/src/python/datatype.rs +++ b/src/daft-schema/src/python/datatype.rs @@ -209,10 +209,10 @@ impl PyDataType { #[staticmethod] pub fn map(key_type: Self, value_type: Self) -> PyResult { - Ok(DataType::Map(Box::new(DataType::Struct(vec![ - Field::new("key", key_type.dtype), - Field::new("value", value_type.dtype), - ]))) + Ok(DataType::Map { + key: Box::new(key_type.dtype), + value: Box::new(value_type.dtype), + } .into()) } diff --git a/src/daft-schema/src/schema.rs b/src/daft-schema/src/schema.rs index 04c0d88c71..c721bfab64 100644 --- a/src/daft-schema/src/schema.rs +++ b/src/daft-schema/src/schema.rs @@ -29,15 +29,19 @@ pub struct Schema { impl Schema { pub fn new(fields: Vec) -> DaftResult { - let mut map: IndexMap = indexmap::IndexMap::new(); - - for f in fields.into_iter() { - let old = map.insert(f.name.clone(), f); - if let Some(item) = old { - return Err(DaftError::ValueError(format!( - "Attempting to make a Schema with duplicate field names: {}", - item.name - ))); + let mut map = IndexMap::new(); + + for f in fields { + match map.entry(f.name.clone()) { + indexmap::map::Entry::Vacant(entry) => { + entry.insert(f); + } + indexmap::map::Entry::Occupied(entry) => { + return Err(DaftError::ValueError(format!( + "Attempting to make a Schema with duplicate field names: {}", + entry.key() + ))); + } } } diff --git a/src/daft-sql/src/planner.rs b/src/daft-sql/src/planner.rs index 1be5b724a9..c5957f8d83 100644 --- a/src/daft-sql/src/planner.rs +++ b/src/daft-sql/src/planner.rs @@ -992,7 +992,7 @@ impl SQLPlanner { invalid_operation_err!("Index must be a string literal") } } - DataType::Map(_) => Ok(daft_dsl::functions::map::get(expr, index)), + DataType::Map { .. } => Ok(daft_dsl::functions::map::get(expr, index)), dtype => { invalid_operation_err!("nested access on column with type: {}", dtype) } diff --git a/src/daft-stats/src/column_stats/mod.rs b/src/daft-stats/src/column_stats/mod.rs index df96daa373..d72ba7cb9c 100644 --- a/src/daft-stats/src/column_stats/mod.rs +++ b/src/daft-stats/src/column_stats/mod.rs @@ -71,7 +71,7 @@ impl ColumnRangeStatistics { // UNSUPPORTED TYPES: // Types that don't support comparisons and can't be used as ColumnRangeStatistics - DataType::List(..) | DataType::FixedSizeList(..) | DataType::Image(..) | DataType::FixedShapeImage(..) | DataType::Tensor(..) | DataType::SparseTensor(..) | DataType::FixedShapeSparseTensor(..) | DataType::FixedShapeTensor(..) | DataType::Struct(..) | DataType::Map(..) | DataType::Extension(..) | DataType::Embedding(..) | DataType::Unknown => false, + DataType::List(..) | DataType::FixedSizeList(..) | DataType::Image(..) | DataType::FixedShapeImage(..) | DataType::Tensor(..) | DataType::SparseTensor(..) | DataType::FixedShapeSparseTensor(..) | DataType::FixedShapeTensor(..) | DataType::Struct(..) | DataType::Map { .. } | DataType::Extension(..) | DataType::Embedding(..) | DataType::Unknown => false, #[cfg(feature = "python")] DataType::Python => false, } diff --git a/src/daft-stats/src/table_stats.rs b/src/daft-stats/src/table_stats.rs index 0fff747c98..660f03e4cf 100644 --- a/src/daft-stats/src/table_stats.rs +++ b/src/daft-stats/src/table_stats.rs @@ -119,13 +119,13 @@ impl TableStatistics { Expr::Alias(col, _) => self.eval_expression(col.as_ref()), Expr::Column(col_name) => { let col = self.columns.get(col_name.as_ref()); - if let Some(col) = col { - Ok(col.clone()) - } else { - Err(crate::Error::DaftCoreCompute { + let Some(col) = col else { + return Err(crate::Error::DaftCoreCompute { source: DaftError::FieldNotFound(col_name.to_string()), - }) - } + }); + }; + + Ok(col.clone()) } Expr::Literal(lit_value) => lit_value.try_into(), Expr::Not(col) => self.eval_expression(col)?.not(), @@ -194,7 +194,6 @@ impl Display for TableStatistics { #[cfg(test)] mod test { - use daft_core::prelude::*; use daft_dsl::{col, lit}; use daft_table::Table; diff --git a/src/daft-table/src/ffi.rs b/src/daft-table/src/ffi.rs index 37a118c50e..424ea516f2 100644 --- a/src/daft-table/src/ffi.rs +++ b/src/daft-table/src/ffi.rs @@ -42,9 +42,9 @@ pub fn record_batches_to_table( let columns = cols .into_iter() .enumerate() - .map(|(i, c)| { - let c = cast_array_for_daft_if_needed(c); - Series::try_from((names.get(i).unwrap().as_str(), c)) + .map(|(i, array)| { + let cast_array = cast_array_for_daft_if_needed(array); + Series::try_from((names.get(i).unwrap().as_str(), cast_array)) }) .collect::>>()?; tables.push(Table::new_with_size(schema.clone(), columns, num_rows)?) diff --git a/src/daft-table/src/lib.rs b/src/daft-table/src/lib.rs index 3669fda3f5..6f87fd6d49 100644 --- a/src/daft-table/src/lib.rs +++ b/src/daft-table/src/lib.rs @@ -1,5 +1,6 @@ #![feature(hash_raw_entry)] #![feature(let_chains)] +#![feature(iterator_try_collect)] use core::slice; use std::{ @@ -495,6 +496,7 @@ impl Table { fn eval_expression(&self, expr: &Expr) -> DaftResult { use crate::Expr::*; + let expected_field = expr.to_field(self.schema.as_ref())?; let series = match expr { Alias(child, name) => Ok(self.eval_expression(child)?.rename(name)), @@ -572,6 +574,7 @@ impl Table { } }, }?; + if expected_field.name != series.field().name { return Err(DaftError::ComputeError(format!( "Mismatch of expected expression name and name from computed series ({} vs {}) for expression: {expr}", @@ -579,32 +582,42 @@ impl Table { series.field().name ))); } + if expected_field.dtype != series.field().dtype { - panic!("Mismatch of expected expression data type and data type from computed series, {} vs {}", expected_field.dtype, series.field().dtype); + panic!( + "Data type mismatch in expression evaluation:\n\ + Expected type: {}\n\ + Computed type: {}\n\ + Expression: {}\n\ + This likely indicates an internal error in type inference or computation.", + expected_field.dtype, + series.field().dtype, + expr + ); } Ok(series) } pub fn eval_expression_list(&self, exprs: &[ExprRef]) -> DaftResult { - let result_series = exprs + let result_series: Vec<_> = exprs .iter() .map(|e| self.eval_expression(e)) - .collect::>>()?; + .try_collect()?; - let fields = result_series - .iter() - .map(|s| s.field().clone()) - .collect::>(); - let mut seen: HashSet = HashSet::new(); - for field in fields.iter() { + let fields: Vec<_> = result_series.iter().map(|s| s.field().clone()).collect(); + + let mut seen = HashSet::new(); + + for field in &fields { let name = &field.name; if seen.contains(name) { return Err(DaftError::ValueError(format!( "Duplicate name found when evaluating expressions: {name}" ))); } - seen.insert(name.clone()); + seen.insert(name); } + let new_schema = Schema::new(fields)?; let has_agg_expr = exprs.iter().any(|e| matches!(e.as_ref(), Expr::Agg(..))); diff --git a/src/daft-table/src/repr_html.rs b/src/daft-table/src/repr_html.rs index 79ecaf063a..0e46bb80b2 100644 --- a/src/daft-table/src/repr_html.rs +++ b/src/daft-table/src/repr_html.rs @@ -102,7 +102,7 @@ pub fn html_value(s: &Series, idx: usize) -> String { let arr = s.struct_().unwrap(); arr.html_value(idx) } - DataType::Map(_) => { + DataType::Map { .. } => { let arr = s.map().unwrap(); arr.html_value(idx) } diff --git a/tests/expressions/test_expressions.py b/tests/expressions/test_expressions.py index abae55386b..755abf5502 100644 --- a/tests/expressions/test_expressions.py +++ b/tests/expressions/test_expressions.py @@ -6,6 +6,7 @@ import pytest import pytz +import daft from daft.datatype import DataType, TimeUnit from daft.expressions import col, lit from daft.expressions.testing import expr_structurally_equal @@ -535,3 +536,77 @@ def test_repr_series_lit() -> None: s = lit(Series.from_pylist([1, 2, 3])) output = repr(s) assert output == "lit([1, 2, 3])" + + +def test_list_value_counts(): + # Create a MicroPartition with a list column + mp = MicroPartition.from_pydict( + {"list_col": [["a", "b", "a", "c"], ["b", "b", "c"], ["a", "a", "a"], [], ["d", None, "d"]]} + ) + + # Apply list_value_counts operation + result = mp.eval_expression_list([col("list_col").list.value_counts().alias("value_counts")]) + value_counts = result.to_pydict()["value_counts"] + + # Expected output + expected = [[("a", 2), ("b", 1), ("c", 1)], [("b", 2), ("c", 1)], [("a", 3)], [], [("d", 2)]] + + # Check the result + assert value_counts == expected + + # Test with empty input (no proper type -> should raise error) + empty_mp = MicroPartition.from_pydict({"list_col": []}) + with pytest.raises(ValueError): + empty_mp.eval_expression_list([col("list_col").list.value_counts().alias("value_counts")]) + + # Test with empty input (no proper type -> should raise error) + none_mp = MicroPartition.from_pydict({"list_col": [None, None, None]}) + with pytest.raises(ValueError): + none_mp.eval_expression_list([col("list_col").list.value_counts().alias("value_counts")]) + + +def test_list_value_counts_nested(): + # Create a MicroPartition with a nested list column + mp = MicroPartition.from_pydict( + { + "nested_list_col": [ + [[1, 2], [3, 4]], + [[1, 2], [5, 6]], + [[3, 4], [1, 2]], + [], + None, + [[1, 2], [1, 2]], + ] + } + ) + + # Apply list_value_counts operation and expect an exception + with pytest.raises(daft.exceptions.DaftCoreException) as exc_info: + mp.eval_expression_list([col("nested_list_col").list.value_counts().alias("value_counts")]) + + # Check the exception message + assert ( + 'DaftError::ArrowError Invalid argument error: The data type type LargeList(Field { name: "item", data_type: Int64, is_nullable: true, metadata: {} }) has no natural order' + in str(exc_info.value) + ) + + +def test_list_value_counts_degenerate(): + import pyarrow as pa + + # Create a MicroPartition with an empty list column of specified type + empty_mp = MicroPartition.from_pydict({"empty_list_col": pa.array([], type=pa.list_(pa.string()))}) + + # Apply list_value_counts operation + result = empty_mp.eval_expression_list([col("empty_list_col").list.value_counts().alias("value_counts")]) + + # Check the result + assert result.to_pydict() == {"value_counts": []} + + # Test with null values + null_mp = MicroPartition.from_pydict({"null_list_col": pa.array([None, None], type=pa.list_(pa.string()))}) + + result_null = null_mp.eval_expression_list([col("null_list_col").list.value_counts().alias("value_counts")]) + + # Check the result for null values + assert result_null.to_pydict() == {"value_counts": [[], []]} diff --git a/tests/integration/iceberg/test_table_load.py b/tests/integration/iceberg/test_table_load.py index 2252f446d6..5cc1f2bf7d 100644 --- a/tests/integration/iceberg/test_table_load.py +++ b/tests/integration/iceberg/test_table_load.py @@ -24,7 +24,7 @@ def test_daft_iceberg_table_open(local_iceberg_tables): WORKING_SHOW_COLLECT = [ - "test_all_types", + # "test_all_types", # Commented out due to issue https://github.com/Eventual-Inc/Daft/issues/2996 "test_limit", "test_null_nan", "test_null_nan_rewritten", diff --git a/tests/io/test_parquet_roundtrip.py b/tests/io/test_parquet_roundtrip.py index 6904805831..292c5b98e1 100644 --- a/tests/io/test_parquet_roundtrip.py +++ b/tests/io/test_parquet_roundtrip.py @@ -112,15 +112,31 @@ def test_roundtrip_temporal_arrow_types(tmp_path, data, pa_type, expected_dtype) def test_roundtrip_tensor_types(tmp_path): - expected_dtype = DataType.tensor(DataType.int64()) - data = [np.array([[1, 2], [3, 4]]), None, None] - before = daft.from_pydict({"foo": Series.from_pylist(data)}) - before = before.concat(before) - before.write_parquet(str(tmp_path)) - after = daft.read_parquet(str(tmp_path)) - assert before.schema()["foo"].dtype == expected_dtype - assert after.schema()["foo"].dtype == expected_dtype - assert before.to_arrow() == after.to_arrow() + # Define the expected data type for the tensor column + expected_tensor_dtype = DataType.tensor(DataType.int64()) + + # Create sample tensor data with some null values + tensor_data = [np.array([[1, 2], [3, 4]]), None, None] + + # Create a Daft DataFrame with the tensor data + df_original = daft.from_pydict({"tensor_col": Series.from_pylist(tensor_data)}) + + # Double the size of the DataFrame to ensure we test with more data + df_original = df_original.concat(df_original) + + assert df_original.schema()["tensor_col"].dtype == expected_tensor_dtype + + # Write the DataFrame to a Parquet file + df_original.write_parquet(str(tmp_path)) + + # Read the Parquet file back into a new DataFrame + df_roundtrip = daft.read_parquet(str(tmp_path)) + + # Verify that the data type is preserved after the roundtrip + assert df_roundtrip.schema()["tensor_col"].dtype == expected_tensor_dtype + + # Ensure the data content is identical after the roundtrip + assert df_original.to_arrow() == df_roundtrip.to_arrow() @pytest.mark.parametrize("fixed_shape", [True, False]) diff --git a/tests/table/map/test_map_get.py b/tests/table/map/test_map_get.py index 6ab7a31ab8..9c7548d1e3 100644 --- a/tests/table/map/test_map_get.py +++ b/tests/table/map/test_map_get.py @@ -49,7 +49,8 @@ def test_map_get_logical_type(): ) table = MicroPartition.from_arrow(pa.table({"map_col": data})) - result = table.eval_expression_list([col("map_col").map.get("foo")]) + map = col("map_col").map + result = table.eval_expression_list([map.get("foo")]) assert result.to_pydict() == {"value": [datetime.date(2022, 1, 1), datetime.date(2022, 1, 2), None]}