From 8181461348757560a4f4b71891f10ae98a7b96ff Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Mon, 23 Oct 2023 21:09:13 -0700 Subject: [PATCH 1/6] implement series serde --- daft/daft.pyi | 3 + daft/series.py | 7 + src/daft-core/src/array/boolean.rs | 9 + src/daft-core/src/array/from.rs | 26 +++ src/daft-core/src/array/from_iter.rs | 45 +++++ src/daft-core/src/array/mod.rs | 16 +- src/daft-core/src/array/serdes.rs | 170 +++++++++++++++++ src/daft-core/src/datatypes/mod.rs | 5 +- src/daft-core/src/python/series.rs | 23 ++- src/daft-core/src/series/mod.rs | 2 +- src/daft-core/src/series/serdes.rs | 275 +++++++++++++++++++++++++++ tests/series/test_series.py | 11 ++ 12 files changed, 583 insertions(+), 9 deletions(-) create mode 100644 src/daft-core/src/array/boolean.rs create mode 100644 src/daft-core/src/array/from_iter.rs create mode 100644 src/daft-core/src/array/serdes.rs create mode 100644 src/daft-core/src/series/serdes.rs diff --git a/daft/daft.pyi b/daft/daft.pyi index e0a3b644a3..27d0ff50ff 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -665,6 +665,9 @@ class PySeries: def image_resize(self, w: int, h: int) -> PySeries: ... def if_else(self, other: PySeries, predicate: PySeries) -> PySeries: ... def is_null(self) -> PySeries: ... + def _debug_bincode_serialize(self) -> bytes: ... + @staticmethod + def _debug_bincode_deserialize(b: bytes) -> PySeries: ... class PyTable: def schema(self) -> PySchema: ... diff --git a/daft/series.py b/daft/series.py index ece133c060..a450367a8e 100644 --- a/daft/series.py +++ b/daft/series.py @@ -527,6 +527,13 @@ def _from_arrow_table_to_series(cls, table: pa.Table, extension_type: pa.Extensi array = extension_type.wrap_array(array) return cls.from_arrow(array, name) + def _debug_bincode_serialize(self) -> bytes: + return self._series._debug_bincode_serialize() + + @classmethod + def _debug_bincode_deserialize(cls, b: bytes) -> Series: + return Series._from_pyseries(PySeries._debug_bincode_deserialize(b)) + SomeSeriesNamespace = TypeVar("SomeSeriesNamespace", bound="SeriesNamespace") diff --git a/src/daft-core/src/array/boolean.rs b/src/daft-core/src/array/boolean.rs new file mode 100644 index 0000000000..7d53c076da --- /dev/null +++ b/src/daft-core/src/array/boolean.rs @@ -0,0 +1,9 @@ +use crate::datatypes::BooleanArray; + +use super::ops::as_arrow::AsArrow; + +impl BooleanArray { + pub fn as_bitmap(&self) -> &arrow2::bitmap::Bitmap { + self.as_arrow().values() + } +} diff --git a/src/daft-core/src/array/from.rs b/src/daft-core/src/array/from.rs index e51b127471..6ad4655fa0 100644 --- a/src/daft-core/src/array/from.rs +++ b/src/daft-core/src/array/from.rs @@ -6,6 +6,7 @@ use crate::datatypes::{ }; use crate::array::DataArray; +use arrow2::bitmap::Bitmap; use common_error::{DaftError, DaftResult}; impl From<(&str, Box>)> @@ -63,6 +64,16 @@ impl From<(&str, &[bool])> for BooleanArray { } } +impl From<(&str, &[Option])> for BooleanArray { + fn from(item: (&str, &[Option])) -> Self { + let (name, slice) = item; + let arrow_array = Box::new(arrow2::array::BooleanArray::from_trusted_len_iter( + slice.iter().cloned(), + )); + DataArray::new(Field::new(name, DataType::Boolean).into(), arrow_array).unwrap() + } +} + impl From<(&str, arrow2::array::BooleanArray)> for BooleanArray { fn from(item: (&str, arrow2::array::BooleanArray)) -> Self { let (name, arrow_array) = item; @@ -74,6 +85,21 @@ impl From<(&str, arrow2::array::BooleanArray)> for BooleanArray { } } +impl From<(&str, arrow2::bitmap::Bitmap)> for BooleanArray { + fn from(item: (&str, arrow2::bitmap::Bitmap)) -> Self { + let (name, bitmap) = item; + DataArray::new( + Field::new(name, DataType::Boolean).into(), + Box::new(arrow2::array::BooleanArray::new( + arrow2::datatypes::DataType::Boolean, + bitmap, + None, + )), + ) + .unwrap() + } +} + impl From<(&str, Box)> for BooleanArray { fn from(item: (&str, Box)) -> Self { let (name, arrow_array) = item; diff --git a/src/daft-core/src/array/from_iter.rs b/src/daft-core/src/array/from_iter.rs new file mode 100644 index 0000000000..19664f0b5a --- /dev/null +++ b/src/daft-core/src/array/from_iter.rs @@ -0,0 +1,45 @@ +use arrow2::datatypes; + +use crate::datatypes::{BinaryArray, DaftNumericType, Field, Utf8Array}; + +use super::DataArray; + +impl DataArray +where + T: DaftNumericType, +{ + pub fn from_iter( + name: &str, + iter: impl Iterator> + arrow2::trusted_len::TrustedLen, + ) -> Self { + let arrow_array = + Box::new(arrow2::array::PrimitiveArray::::from_trusted_len_iter(iter)); + DataArray::new(Field::new(name, T::get_dtype()).into(), arrow_array).unwrap() + } +} + +impl Utf8Array { + pub fn from_iter>( + name: &str, + iter: impl Iterator> + arrow2::trusted_len::TrustedLen, + ) -> Self { + let arrow_array = Box::new(arrow2::array::Utf8Array::::from_trusted_len_iter(iter)); + DataArray::new(Field::new(name, crate::DataType::Utf8).into(), arrow_array).unwrap() + } +} + +impl BinaryArray { + pub fn from_iter>( + name: &str, + iter: impl Iterator> + arrow2::trusted_len::TrustedLen, + ) -> Self { + let arrow_array = Box::new(arrow2::array::BinaryArray::::from_trusted_len_iter( + iter, + )); + DataArray::new( + Field::new(name, crate::DataType::Binary).into(), + arrow_array, + ) + .unwrap() + } +} diff --git a/src/daft-core/src/array/mod.rs b/src/daft-core/src/array/mod.rs index 2b23671987..dffad96a41 100644 --- a/src/daft-core/src/array/mod.rs +++ b/src/daft-core/src/array/mod.rs @@ -1,22 +1,30 @@ +mod fixed_size_list_array; pub mod from; pub mod growable; pub mod iterator; +mod list_array; pub mod ops; pub mod pseudo_arrow; - -mod fixed_size_list_array; -mod list_array; +mod serdes; mod struct_array; pub use fixed_size_list_array::FixedSizeListArray; pub use list_array::ListArray; +use serde::ser::SerializeStruct; pub use struct_array::StructArray; +mod boolean; +mod from_iter; use std::{marker::PhantomData, sync::Arc}; -use crate::datatypes::{DaftArrayType, DaftPhysicalType, DataType, Field}; +use crate::{ + datatypes::{DaftArrayType, DaftArrowBackedType, DaftPhysicalType, DataType, Field}, + with_match_physical_daft_types, +}; use common_error::{DaftError, DaftResult}; +use self::ops::as_arrow::AsArrow; + #[derive(Debug)] pub struct DataArray { pub field: Arc, diff --git a/src/daft-core/src/array/serdes.rs b/src/daft-core/src/array/serdes.rs new file mode 100644 index 0000000000..0de19023a7 --- /dev/null +++ b/src/daft-core/src/array/serdes.rs @@ -0,0 +1,170 @@ +use serde::ser::{SerializeMap, SerializeStruct}; + +use crate::{ + datatypes::{ + logical::LogicalArray, BinaryArray, BooleanArray, DaftLogicalType, DaftNumericType, + ExtensionArray, Int64Array, NullArray, PythonArray, Utf8Array, + }, + IntoSeries, Series, +}; + +use super::{ops::as_arrow::AsArrow, DataArray, FixedSizeListArray, ListArray, StructArray}; + +impl serde::Serialize for DataArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + s.serialize_entry("values", &self.as_arrow().iter().collect::>())?; + s.end() + } +} + +impl serde::Serialize for Utf8Array { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + s.serialize_entry("values", &self.as_arrow().iter().collect::>())?; + s.end() + } +} + +impl serde::Serialize for BooleanArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + s.serialize_entry("values", &self.as_arrow().iter().collect::>())?; + s.end() + } +} + +impl serde::Serialize for BinaryArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + s.serialize_entry("values", &self.as_arrow().iter().collect::>())?; + s.end() + } +} + +impl serde::Serialize for NullArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + s.serialize_entry("values", &self.len())?; + s.end() + } +} + +impl serde::Serialize for ExtensionArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + + let values = Series::try_from(("values", self.data.clone())) + .unwrap() + .as_physical() + .unwrap(); + s.serialize_entry("values", &values)?; + s.end() + } +} + +#[cfg(feature = "python")] +impl serde::Serialize for PythonArray { + fn serialize(&self, _serializer: S) -> Result + where + S: serde::Serializer, + { + panic!("Rust Serde is not implemented for Python Arrays") + } +} + +impl serde::Serialize for StructArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + + let mut values = Vec::with_capacity(self.children.len() + 1); + let validity = self + .validity() + .map(|b| BooleanArray::from(("validity", b.clone())).into_series()); + values.push(validity.as_ref()); + values.extend(self.children.iter().map(|c| Some(c))); + s.serialize_entry("field", self.field.as_ref())?; + s.serialize_entry("values", &values)?; + s.end() + } +} + +impl serde::Serialize for ListArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + + let arrow2_offsets = arrow2::array::Int64Array::new( + arrow2::datatypes::DataType::Int64, + self.offsets().buffer().clone(), + self.validity().cloned(), + ); + let offsets = &Int64Array::from(("validity", Box::new(arrow2_offsets))).into_series(); + let values = vec![offsets, &self.flat_child]; + s.serialize_entry("field", self.field.as_ref())?; + s.serialize_entry("values", &values)?; + s.end() + } +} + +impl serde::Serialize for FixedSizeListArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + + let validity = self + .validity() + .map(|b| BooleanArray::from(("validity", b.clone())).into_series()); + let values = vec![validity.as_ref(), Some(&self.flat_child)]; + s.serialize_entry("field", self.field.as_ref())?; + s.serialize_entry("values", &values)?; + s.end() + } +} + +impl serde::Serialize for LogicalArray +where + <::PhysicalType as crate::datatypes::DaftDataType>::ArrayType: + serde::Serialize + IntoSeries, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field.as_ref())?; + s.serialize_entry("values", &self.physical.clone().into_series())?; + s.end() + } +} diff --git a/src/daft-core/src/datatypes/mod.rs b/src/daft-core/src/datatypes/mod.rs index 305877b860..04c4b58f2c 100644 --- a/src/daft-core/src/datatypes/mod.rs +++ b/src/daft-core/src/datatypes/mod.rs @@ -6,8 +6,6 @@ mod image_mode; mod matching; mod time_unit; -use std::ops::{Add, Div, Mul, Rem, Sub}; - pub use crate::array::{DataArray, FixedSizeListArray}; use crate::array::{ListArray, StructArray}; use arrow2::{ @@ -21,6 +19,8 @@ pub use field::FieldID; pub use image_format::ImageFormat; pub use image_mode::ImageMode; use num_traits::{Bounded, Float, FromPrimitive, Num, NumCast, ToPrimitive, Zero}; +use serde::{Deserialize, Serialize}; +use std::ops::{Add, Div, Mul, Rem, Sub}; pub use time_unit::TimeUnit; pub mod logical; @@ -201,6 +201,7 @@ pub trait NumericNative: + Bounded + FromPrimitive + ToPrimitive + + Serialize { type DAFTTYPE: DaftNumericType; } diff --git a/src/daft-core/src/python/series.rs b/src/daft-core/src/python/series.rs index f3c8cb3fa0..16109a4bab 100644 --- a/src/daft-core/src/python/series.rs +++ b/src/daft-core/src/python/series.rs @@ -1,6 +1,14 @@ -use std::ops::{Add, Div, Mul, Rem, Sub}; +use std::{ + borrow::Cow, + ops::{Add, Div, Mul, Rem, Sub}, +}; -use pyo3::{exceptions::PyValueError, prelude::*, pyclass::CompareOp, types::PyList}; +use pyo3::{ + exceptions::PyValueError, + prelude::*, + pyclass::CompareOp, + types::{PyBytes, PyList}, +}; use crate::{ array::{ops::DaftLogical, pseudo_arrow::PseudoArrowArray, DataArray}, @@ -316,6 +324,17 @@ impl PySeries { pub fn is_null(&self) -> PyResult { Ok(self.series.is_null()?.into()) } + + pub fn _debug_bincode_serialize(&self, py: Python) -> PyResult { + let values = bincode::serialize(&self.series).unwrap(); + Ok(PyBytes::new(py, &values).to_object(py)) + } + + #[staticmethod] + pub fn _debug_bincode_deserialize(bytes: &PyBytes) -> PyResult { + let values = bincode::deserialize::(bytes.as_bytes()).unwrap(); + Ok(Self { series: values }) + } } impl From for PySeries { diff --git a/src/daft-core/src/series/mod.rs b/src/daft-core/src/series/mod.rs index 27be057048..40cfe5b5f2 100644 --- a/src/daft-core/src/series/mod.rs +++ b/src/daft-core/src/series/mod.rs @@ -1,8 +1,8 @@ mod array_impl; mod from; mod ops; +mod serdes; mod series_like; - use std::{ fmt::{Display, Formatter, Result}, sync::Arc, diff --git a/src/daft-core/src/series/serdes.rs b/src/daft-core/src/series/serdes.rs new file mode 100644 index 0000000000..a601a041fd --- /dev/null +++ b/src/daft-core/src/series/serdes.rs @@ -0,0 +1,275 @@ +use std::{borrow::Cow, sync::Arc}; + +use arrow2::offset::OffsetsBuffer; +use serde::{de::Visitor, Deserializer}; + +use crate::{ + array::{ + ops::{as_arrow::AsArrow, full::FullNull}, + ListArray, StructArray, + }, + datatypes::logical::{ + DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray, + FixedShapeTensorArray, ImageArray, TensorArray, TimestampArray, + }, + with_match_daft_types, DataType, IntoSeries, Series, +}; + +impl serde::Serialize for Series { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + with_match_daft_types!(self.data_type(), |$T| { + let array = self.downcast::<<$T as DaftDataType>::ArrayType>().unwrap(); + array.serialize(serializer) + }) + } +} + +impl<'d> serde::Deserialize<'d> for Series { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'d>, + { + const EXPECTED_KEYS: &[&str] = &["field", "values"]; + + struct SeriesVisitor; + + impl<'d> Visitor<'d> for SeriesVisitor { + type Value = Series; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("struct {field: , values: }") + } + + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'d>, + { + let mut field: Option = None; + let mut values_set = false; + while let Some(key) = map.next_key::>().unwrap() { + match key.as_ref() { + "field" => { + field = Some(map.next_value()?); + } + "values" => { + values_set = true; + break; + } + unknown => { + return Err(serde::de::Error::unknown_field(unknown, EXPECTED_KEYS)) + } + } + } + let field = field.unwrap(); + use crate::datatypes::*; + use DataType::*; + match &field.dtype { + Null => Ok(NullArray::full_null( + &field.name, + &field.dtype, + map.next_value::()?, + ) + .into_series()), + Boolean => Ok(BooleanArray::from(( + field.name.as_str(), + map.next_value::>>()?.as_slice(), + )) + .into_series()), + Int8 => Ok(Int8Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Int16 => Ok(Int16Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Int32 => Ok(Int32Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Int64 => Ok(Int64Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Int128 => Ok(Int128Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + UInt8 => Ok(UInt8Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + UInt16 => Ok(UInt16Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + UInt32 => Ok(UInt32Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + UInt64 => Ok(UInt64Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Float32 => Ok(Float32Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Float64 => Ok(Float64Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Utf8 => Ok(Utf8Array::from_iter( + field.name.as_str(), + map.next_value::>>>()?.into_iter(), + ) + .into_series()), + Binary => Ok(BinaryArray::from_iter( + field.name.as_str(), + map.next_value::>>>()?.into_iter(), + ) + .into_series()), + Extension(..) => { + let physical = map.next_value::().unwrap().to_arrow(); + Ok(ExtensionArray::new(Arc::new(field), physical) + .unwrap() + .into_series()) + } + Struct(..) => { + let mut all_series = map.next_value::>>()?; + let validity = all_series.pop().unwrap(); + let children = all_series + .into_iter() + .map(|s| s.unwrap()) + .collect::>(); + + let validity = validity.map(|v| v.bool().unwrap().as_bitmap().clone()); + Ok(StructArray::new(Arc::new(field), children, validity).into_series()) + } + List(..) => { + let mut all_series = map.next_value::>()?; + let offsets_series = all_series.pop().unwrap(); + let offsets_array = offsets_series.i64().unwrap(); + let validity = offsets_array.data().validity().cloned(); + let offsets = OffsetsBuffer::::try_from( + offsets_array.as_arrow().values().clone(), + ) + .unwrap(); + let flat_child = all_series.pop().unwrap(); + Ok(ListArray::new(field, flat_child, offsets, validity).into_series()) + } + FixedSizeList(..) => { + let mut all_series = map.next_value::>>()?; + let validity = all_series.pop().unwrap(); + let flat_child = all_series.pop().unwrap().unwrap(); + + let validity = validity.map(|v| v.bool().unwrap().as_bitmap().clone()); + Ok(FixedSizeListArray::new(field, flat_child, validity).into_series()) + } + Decimal128(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::().unwrap(); + Ok(Decimal128Array::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series()) + } + Timestamp(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::().unwrap(); + Ok(TimestampArray::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series()) + } + Date => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::().unwrap(); + Ok( + DateArray::new(field, physical.downcast::().unwrap().clone()) + .into_series(), + ) + } + Time(..) => panic!("Time Deserialization not implemented"), + Duration(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::().unwrap(); + Ok( + DurationArray::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series(), + ) + } + Embedding(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::().unwrap(); + Ok(EmbeddingArray::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series()) + } + Image(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::().unwrap(); + Ok( + ImageArray::new(field, physical.downcast::().unwrap().clone()) + .into_series(), + ) + } + FixedShapeImage(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::().unwrap(); + Ok(FixedShapeImageArray::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series()) + } + FixedShapeTensor(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::().unwrap(); + Ok(FixedShapeTensorArray::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series()) + } + Tensor(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::().unwrap(); + Ok( + TensorArray::new(field, physical.downcast::().unwrap().clone()) + .into_series(), + ) + } + Python => { + panic!("python deserialization not implemented for rust Serde"); + } + Unknown => { + panic!("Unable to deserialize Unknown DataType"); + } + } + } + } + deserializer.deserialize_map(SeriesVisitor) + } +} diff --git a/tests/series/test_series.py b/tests/series/test_series.py index 7104ba891f..a3c54c53c3 100644 --- a/tests/series/test_series.py +++ b/tests/series/test_series.py @@ -128,3 +128,14 @@ def test_series_pickling(dtype) -> None: copied_s = copy.deepcopy(s) assert s.datatype() == copied_s.datatype() assert s.to_pylist() == copied_s.to_pylist() + + +@pytest.mark.parametrize("dtype", ARROW_FLOAT_TYPES + ARROW_INT_TYPES + ARROW_STRING_TYPES) +def test_series_bincode_serdes(dtype) -> None: + s = Series.from_pylist([1, 2, 3, None]).cast(DataType.from_arrow_type(dtype)) + serialized = s._debug_bincode_serialize() + copied_s = Series._debug_bincode_deserialize(serialized) + + assert s.name() == copied_s.name() + assert s.datatype() == copied_s.datatype() + assert s.to_pylist() == copied_s.to_pylist() From 02d32cf6bac4e8ce6e8546ff3c027ff81c8fa30b Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Mon, 23 Oct 2023 21:12:53 -0700 Subject: [PATCH 2/6] clippy fixes --- src/daft-core/src/array/from.rs | 2 +- src/daft-core/src/array/from_iter.rs | 2 -- src/daft-core/src/array/mod.rs | 9 ++------- src/daft-core/src/array/serdes.rs | 9 ++++++--- src/daft-core/src/datatypes/mod.rs | 2 +- src/daft-core/src/python/series.rs | 5 +---- src/daft-core/src/series/serdes.rs | 5 ++++- src/daft-csv/src/metadata.rs | 21 +++++++-------------- src/daft-csv/src/read.rs | 16 +++++----------- 9 files changed, 27 insertions(+), 44 deletions(-) diff --git a/src/daft-core/src/array/from.rs b/src/daft-core/src/array/from.rs index 6ad4655fa0..9d6de70a6a 100644 --- a/src/daft-core/src/array/from.rs +++ b/src/daft-core/src/array/from.rs @@ -6,7 +6,7 @@ use crate::datatypes::{ }; use crate::array::DataArray; -use arrow2::bitmap::Bitmap; + use common_error::{DaftError, DaftResult}; impl From<(&str, Box>)> diff --git a/src/daft-core/src/array/from_iter.rs b/src/daft-core/src/array/from_iter.rs index 19664f0b5a..7fb3b5ad21 100644 --- a/src/daft-core/src/array/from_iter.rs +++ b/src/daft-core/src/array/from_iter.rs @@ -1,5 +1,3 @@ -use arrow2::datatypes; - use crate::datatypes::{BinaryArray, DaftNumericType, Field, Utf8Array}; use super::DataArray; diff --git a/src/daft-core/src/array/mod.rs b/src/daft-core/src/array/mod.rs index dffad96a41..d38769eb6d 100644 --- a/src/daft-core/src/array/mod.rs +++ b/src/daft-core/src/array/mod.rs @@ -9,22 +9,17 @@ mod serdes; mod struct_array; pub use fixed_size_list_array::FixedSizeListArray; pub use list_array::ListArray; -use serde::ser::SerializeStruct; + pub use struct_array::StructArray; mod boolean; mod from_iter; use std::{marker::PhantomData, sync::Arc}; -use crate::{ - datatypes::{DaftArrayType, DaftArrowBackedType, DaftPhysicalType, DataType, Field}, - with_match_physical_daft_types, -}; +use crate::datatypes::{DaftArrayType, DaftPhysicalType, DataType, Field}; use common_error::{DaftError, DaftResult}; -use self::ops::as_arrow::AsArrow; - #[derive(Debug)] pub struct DataArray { pub field: Arc, diff --git a/src/daft-core/src/array/serdes.rs b/src/daft-core/src/array/serdes.rs index 0de19023a7..ed4b83487e 100644 --- a/src/daft-core/src/array/serdes.rs +++ b/src/daft-core/src/array/serdes.rs @@ -1,13 +1,16 @@ -use serde::ser::{SerializeMap, SerializeStruct}; +use serde::ser::SerializeMap; use crate::{ datatypes::{ logical::LogicalArray, BinaryArray, BooleanArray, DaftLogicalType, DaftNumericType, - ExtensionArray, Int64Array, NullArray, PythonArray, Utf8Array, + ExtensionArray, Int64Array, NullArray, Utf8Array, }, IntoSeries, Series, }; +#[cfg(feature = "python")] +use crate::datatypes::PythonArray; + use super::{ops::as_arrow::AsArrow, DataArray, FixedSizeListArray, ListArray, StructArray}; impl serde::Serialize for DataArray { @@ -109,7 +112,7 @@ impl serde::Serialize for StructArray { .validity() .map(|b| BooleanArray::from(("validity", b.clone())).into_series()); values.push(validity.as_ref()); - values.extend(self.children.iter().map(|c| Some(c))); + values.extend(self.children.iter().map(Some)); s.serialize_entry("field", self.field.as_ref())?; s.serialize_entry("values", &values)?; s.end() diff --git a/src/daft-core/src/datatypes/mod.rs b/src/daft-core/src/datatypes/mod.rs index 04c4b58f2c..9d1d614131 100644 --- a/src/daft-core/src/datatypes/mod.rs +++ b/src/daft-core/src/datatypes/mod.rs @@ -19,7 +19,7 @@ pub use field::FieldID; pub use image_format::ImageFormat; pub use image_mode::ImageMode; use num_traits::{Bounded, Float, FromPrimitive, Num, NumCast, ToPrimitive, Zero}; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use std::ops::{Add, Div, Mul, Rem, Sub}; pub use time_unit::TimeUnit; diff --git a/src/daft-core/src/python/series.rs b/src/daft-core/src/python/series.rs index 16109a4bab..3c54a0ee04 100644 --- a/src/daft-core/src/python/series.rs +++ b/src/daft-core/src/python/series.rs @@ -1,7 +1,4 @@ -use std::{ - borrow::Cow, - ops::{Add, Div, Mul, Rem, Sub}, -}; +use std::ops::{Add, Div, Mul, Rem, Sub}; use pyo3::{ exceptions::PyValueError, diff --git a/src/daft-core/src/series/serdes.rs b/src/daft-core/src/series/serdes.rs index a601a041fd..b35a90c5c9 100644 --- a/src/daft-core/src/series/serdes.rs +++ b/src/daft-core/src/series/serdes.rs @@ -63,7 +63,10 @@ impl<'d> serde::Deserialize<'d> for Series { } } } - let field = field.unwrap(); + if !values_set { + return Err(serde::de::Error::missing_field("values")); + } + let field = field.ok_or_else(|| serde::de::Error::missing_field("name"))?; use crate::datatypes::*; use DataType::*; match &field.dtype { diff --git a/src/daft-csv/src/metadata.rs b/src/daft-csv/src/metadata.rs index 0e4fcd7b33..3a67af421a 100644 --- a/src/daft-csv/src/metadata.rs +++ b/src/daft-csv/src/metadata.rs @@ -327,8 +327,7 @@ mod tests { Field::new("petal.length", DataType::Float64), Field::new("petal.width", DataType::Float64), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 328); assert_eq!(num_records_read, 20); @@ -363,8 +362,7 @@ mod tests { Field::new("petal.length", DataType::Float64), Field::new("petal.width", DataType::Float64), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 328); assert_eq!(num_records_read, 20); @@ -409,8 +407,7 @@ mod tests { Field::new("column_3", DataType::Float64), Field::new("column_4", DataType::Float64), Field::new("column_5", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 328); assert_eq!(num_records_read, 20); @@ -439,8 +436,7 @@ mod tests { Field::new("petal.length", DataType::Float64), Field::new("petal.width", DataType::Float64), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 49); assert_eq!(num_records_read, 3); @@ -466,8 +462,7 @@ mod tests { Field::new("petal.length", DataType::Float64), Field::new("petal.width", DataType::Float64), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 82); assert_eq!(num_records_read, 6); @@ -497,8 +492,7 @@ mod tests { Field::new("petal.length", DataType::Utf8), Field::new("petal.width", DataType::Utf8), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 33); assert_eq!(num_records_read, 2); @@ -530,8 +524,7 @@ mod tests { Field::new("petal.length", DataType::Float64), Field::new("petal.width", DataType::Float64), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); // Max bytes doesn't include header, so add 15 bytes to upper bound. assert!(total_bytes_read <= 100 + 15, "{}", total_bytes_read); diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 693a2d0cdb..303b07423d 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -462,19 +462,13 @@ mod tests { .unwrap(); let (mut fields, _) = infer_schema(&mut reader, None, has_header, &infer).unwrap(); if !has_header && let Some(column_names) = column_names { - fields = fields.into_iter().zip(column_names.into_iter()).map(|(field, name)| arrow2::datatypes::Field::new(name, field.data_type, true).with_metadata(field.metadata)).collect::>(); + fields = fields.into_iter().zip(column_names).map(|(field, name)| arrow2::datatypes::Field::new(name, field.data_type, true).with_metadata(field.metadata)).collect::>(); } let mut rows = vec![ByteRecord::default(); limit.unwrap_or(100)]; let rows_read = read_rows(&mut reader, 0, &mut rows).unwrap(); let rows = &rows[..rows_read]; - let chunk = deserialize_batch( - rows, - &fields, - projection.as_ref().map(|p| p.as_slice()), - 0, - deserialize_column, - ) - .unwrap(); + let chunk = + deserialize_batch(rows, &fields, projection.as_deref(), 0, deserialize_column).unwrap(); if let Some(projection) = projection { fields = projection .into_iter() @@ -1216,7 +1210,7 @@ mod tests { let column_names = vec!["a", "b"]; let table = read_csv( - file.as_ref(), + file, Some(column_names.clone()), None, None, @@ -1254,7 +1248,7 @@ mod tests { let column_names = vec!["a", "b"]; let table = read_csv( - file.as_ref(), + file, Some(column_names.clone()), Some(vec!["b"]), None, From 8f4c0bbc4c929bbd2d1cf83e02e2fbb55b596ba6 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Mon, 23 Oct 2023 22:14:04 -0700 Subject: [PATCH 3/6] fixes for extension types, struct and list --- src/daft-core/src/array/serdes.rs | 33 +++++++++++++++-------- src/daft-core/src/series/serdes.rs | 15 ++++++----- tests/series/test_series.py | 43 ++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 17 deletions(-) diff --git a/src/daft-core/src/array/serdes.rs b/src/daft-core/src/array/serdes.rs index ed4b83487e..1911f00ff0 100644 --- a/src/daft-core/src/array/serdes.rs +++ b/src/daft-core/src/array/serdes.rs @@ -5,7 +5,7 @@ use crate::{ logical::LogicalArray, BinaryArray, BooleanArray, DaftLogicalType, DaftNumericType, ExtensionArray, Int64Array, NullArray, Utf8Array, }, - IntoSeries, Series, + DataType, IntoSeries, Series, }; #[cfg(feature = "python")] @@ -80,11 +80,11 @@ impl serde::Serialize for ExtensionArray { { let mut s = serializer.serialize_map(Some(2))?; s.serialize_entry("field", self.field())?; - - let values = Series::try_from(("values", self.data.clone())) - .unwrap() - .as_physical() - .unwrap(); + let values = if let DataType::Extension(_, inner, _) = self.data_type() { + Series::try_from(("physical", self.data.to_type(inner.to_arrow().unwrap()))).unwrap() + } else { + panic!("Expected Extension Type!") + }; s.serialize_entry("values", &values)?; s.end() } @@ -108,11 +108,13 @@ impl serde::Serialize for StructArray { let mut s = serializer.serialize_map(Some(2))?; let mut values = Vec::with_capacity(self.children.len() + 1); + values.extend(self.children.iter().map(Some)); + let validity = self .validity() .map(|b| BooleanArray::from(("validity", b.clone())).into_series()); values.push(validity.as_ref()); - values.extend(self.children.iter().map(Some)); + s.serialize_entry("field", self.field.as_ref())?; s.serialize_entry("values", &values)?; s.end() @@ -125,14 +127,23 @@ impl serde::Serialize for ListArray { S: serde::Serializer, { let mut s = serializer.serialize_map(Some(2))?; + let mut values = Vec::with_capacity(3); + + values.push(Some(&self.flat_child)); let arrow2_offsets = arrow2::array::Int64Array::new( arrow2::datatypes::DataType::Int64, self.offsets().buffer().clone(), - self.validity().cloned(), + None, ); - let offsets = &Int64Array::from(("validity", Box::new(arrow2_offsets))).into_series(); - let values = vec![offsets, &self.flat_child]; + let offsets = Int64Array::from(("offsets", Box::new(arrow2_offsets))).into_series(); + values.push(Some(&offsets)); + + let validity = self + .validity() + .map(|b| BooleanArray::from(("validity", b.clone())).into_series()); + values.push(validity.as_ref()); + s.serialize_entry("field", self.field.as_ref())?; s.serialize_entry("values", &values)?; s.end() @@ -149,7 +160,7 @@ impl serde::Serialize for FixedSizeListArray { let validity = self .validity() .map(|b| BooleanArray::from(("validity", b.clone())).into_series()); - let values = vec![validity.as_ref(), Some(&self.flat_child)]; + let values = vec![Some(&self.flat_child), validity.as_ref()]; s.serialize_entry("field", self.field.as_ref())?; s.serialize_entry("values", &values)?; s.end() diff --git a/src/daft-core/src/series/serdes.rs b/src/daft-core/src/series/serdes.rs index b35a90c5c9..b2a637a716 100644 --- a/src/daft-core/src/series/serdes.rs +++ b/src/daft-core/src/series/serdes.rs @@ -147,8 +147,10 @@ impl<'d> serde::Deserialize<'d> for Series { ) .into_series()), Extension(..) => { - let physical = map.next_value::().unwrap().to_arrow(); - Ok(ExtensionArray::new(Arc::new(field), physical) + let physical = map.next_value::().unwrap(); + let physical = physical.to_arrow(); + let ext_array = physical.to_type(field.dtype.to_arrow().unwrap()); + Ok(ExtensionArray::new(Arc::new(field), ext_array) .unwrap() .into_series()) } @@ -164,15 +166,16 @@ impl<'d> serde::Deserialize<'d> for Series { Ok(StructArray::new(Arc::new(field), children, validity).into_series()) } List(..) => { - let mut all_series = map.next_value::>()?; - let offsets_series = all_series.pop().unwrap(); + let mut all_series = map.next_value::>>()?; + let validity = all_series.pop().unwrap(); + let validity = validity.map(|v| v.bool().unwrap().as_bitmap().clone()); + let offsets_series = all_series.pop().unwrap().unwrap(); let offsets_array = offsets_series.i64().unwrap(); - let validity = offsets_array.data().validity().cloned(); let offsets = OffsetsBuffer::::try_from( offsets_array.as_arrow().values().clone(), ) .unwrap(); - let flat_child = all_series.pop().unwrap(); + let flat_child = all_series.pop().unwrap().unwrap(); Ok(ListArray::new(field, flat_child, offsets, validity).into_series()) } FixedSizeList(..) => { diff --git a/tests/series/test_series.py b/tests/series/test_series.py index a3c54c53c3..962b192763 100644 --- a/tests/series/test_series.py +++ b/tests/series/test_series.py @@ -2,7 +2,9 @@ import copy from collections import Counter +from datetime import date, datetime +import numpy as np import pyarrow as pa import pytest @@ -139,3 +141,44 @@ def test_series_bincode_serdes(dtype) -> None: assert s.name() == copied_s.name() assert s.datatype() == copied_s.datatype() assert s.to_pylist() == copied_s.to_pylist() + + +@pytest.mark.parametrize( + "data", + [ + [{"a": "foo", "b": "bar"}, None, {"b": "baz", "c": "quux"}], + [[1, 2, 3], None, [4, 5]], + [datetime.now(), None, datetime.now()], + [date.today(), date.today(), None], + ], +) +def test_series_bincode_serdes_on_data(data) -> None: + s = Series.from_arrow(pa.array(data)) + serialized = s._debug_bincode_serialize() + copied_s = Series._debug_bincode_deserialize(serialized) + + assert s.name() == copied_s.name() + assert s.datatype() == copied_s.datatype() + assert s.to_pylist() == copied_s.to_pylist() + + +def test_series_bincode_serdes_on_ext_type(uuid_ext_type) -> None: + arr = pa.array(f"{i}".encode() for i in range(5)) + data = pa.ExtensionArray.from_storage(uuid_ext_type, arr) + s = Series.from_arrow(data) + serialized = s._debug_bincode_serialize() + copied_s = Series._debug_bincode_deserialize(serialized) + + assert s.name() == copied_s.name() + assert s.datatype() == copied_s.datatype() + assert s.to_pylist() == copied_s.to_pylist() + + +def test_series_bincode_serdes_on_complex_types() -> None: + s = Series.from_pylist([np.ones((4, 5)), np.ones((2, 10))]) + serialized = s._debug_bincode_serialize() + copied_s = Series._debug_bincode_deserialize(serialized) + + assert s.name() == copied_s.name() + assert s.datatype() == copied_s.datatype() + assert all(np.all(l == r) for l, r in zip(s.to_pylist(), copied_s.to_pylist())) From 46a694bac8db84a45970fd98f98df7760141670d Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Mon, 23 Oct 2023 22:19:35 -0700 Subject: [PATCH 4/6] better error handling --- src/daft-core/src/series/serdes.rs | 49 +++++++++++++++++++----------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/src/daft-core/src/series/serdes.rs b/src/daft-core/src/series/serdes.rs index b2a637a716..8d9370a7cf 100644 --- a/src/daft-core/src/series/serdes.rs +++ b/src/daft-core/src/series/serdes.rs @@ -49,7 +49,7 @@ impl<'d> serde::Deserialize<'d> for Series { { let mut field: Option = None; let mut values_set = false; - while let Some(key) = map.next_key::>().unwrap() { + while let Some(key) = map.next_key::>()? { match key.as_ref() { "field" => { field = Some(map.next_value()?); @@ -147,7 +147,7 @@ impl<'d> serde::Deserialize<'d> for Series { ) .into_series()), Extension(..) => { - let physical = map.next_value::().unwrap(); + let physical = map.next_value::()?; let physical = physical.to_arrow(); let ext_array = physical.to_type(field.dtype.to_arrow().unwrap()); Ok(ExtensionArray::new(Arc::new(field), ext_array) @@ -156,7 +156,9 @@ impl<'d> serde::Deserialize<'d> for Series { } Struct(..) => { let mut all_series = map.next_value::>>()?; - let validity = all_series.pop().unwrap(); + let validity = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("validity"))?; let children = all_series .into_iter() .map(|s| s.unwrap()) @@ -167,28 +169,41 @@ impl<'d> serde::Deserialize<'d> for Series { } List(..) => { let mut all_series = map.next_value::>>()?; - let validity = all_series.pop().unwrap(); + let validity = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("validity"))?; let validity = validity.map(|v| v.bool().unwrap().as_bitmap().clone()); - let offsets_series = all_series.pop().unwrap().unwrap(); + let offsets_series = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("offsets"))? + .unwrap(); let offsets_array = offsets_series.i64().unwrap(); let offsets = OffsetsBuffer::::try_from( offsets_array.as_arrow().values().clone(), ) .unwrap(); - let flat_child = all_series.pop().unwrap().unwrap(); + let flat_child = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("flat_child"))? + .unwrap(); Ok(ListArray::new(field, flat_child, offsets, validity).into_series()) } FixedSizeList(..) => { let mut all_series = map.next_value::>>()?; - let validity = all_series.pop().unwrap(); - let flat_child = all_series.pop().unwrap().unwrap(); + let validity = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("validity"))?; + let flat_child = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("flat_child"))? + .unwrap(); let validity = validity.map(|v| v.bool().unwrap().as_bitmap().clone()); Ok(FixedSizeListArray::new(field, flat_child, validity).into_series()) } Decimal128(..) => { type PType = <::PhysicalType as DaftDataType>::ArrayType; - let physical = map.next_value::().unwrap(); + let physical = map.next_value::()?; Ok(Decimal128Array::new( field, physical.downcast::().unwrap().clone(), @@ -197,7 +212,7 @@ impl<'d> serde::Deserialize<'d> for Series { } Timestamp(..) => { type PType = <::PhysicalType as DaftDataType>::ArrayType; - let physical = map.next_value::().unwrap(); + let physical = map.next_value::()?; Ok(TimestampArray::new( field, physical.downcast::().unwrap().clone(), @@ -206,7 +221,7 @@ impl<'d> serde::Deserialize<'d> for Series { } Date => { type PType = <::PhysicalType as DaftDataType>::ArrayType; - let physical = map.next_value::().unwrap(); + let physical = map.next_value::()?; Ok( DateArray::new(field, physical.downcast::().unwrap().clone()) .into_series(), @@ -215,7 +230,7 @@ impl<'d> serde::Deserialize<'d> for Series { Time(..) => panic!("Time Deserialization not implemented"), Duration(..) => { type PType = <::PhysicalType as DaftDataType>::ArrayType; - let physical = map.next_value::().unwrap(); + let physical = map.next_value::()?; Ok( DurationArray::new( field, @@ -226,7 +241,7 @@ impl<'d> serde::Deserialize<'d> for Series { } Embedding(..) => { type PType = <::PhysicalType as DaftDataType>::ArrayType; - let physical = map.next_value::().unwrap(); + let physical = map.next_value::()?; Ok(EmbeddingArray::new( field, physical.downcast::().unwrap().clone(), @@ -235,7 +250,7 @@ impl<'d> serde::Deserialize<'d> for Series { } Image(..) => { type PType = <::PhysicalType as DaftDataType>::ArrayType; - let physical = map.next_value::().unwrap(); + let physical = map.next_value::()?; Ok( ImageArray::new(field, physical.downcast::().unwrap().clone()) .into_series(), @@ -243,7 +258,7 @@ impl<'d> serde::Deserialize<'d> for Series { } FixedShapeImage(..) => { type PType = <::PhysicalType as DaftDataType>::ArrayType; - let physical = map.next_value::().unwrap(); + let physical = map.next_value::()?; Ok(FixedShapeImageArray::new( field, physical.downcast::().unwrap().clone(), @@ -252,7 +267,7 @@ impl<'d> serde::Deserialize<'d> for Series { } FixedShapeTensor(..) => { type PType = <::PhysicalType as DaftDataType>::ArrayType; - let physical = map.next_value::().unwrap(); + let physical = map.next_value::()?; Ok(FixedShapeTensorArray::new( field, physical.downcast::().unwrap().clone(), @@ -261,7 +276,7 @@ impl<'d> serde::Deserialize<'d> for Series { } Tensor(..) => { type PType = <::PhysicalType as DaftDataType>::ArrayType; - let physical = map.next_value::().unwrap(); + let physical = map.next_value::()?; Ok( TensorArray::new(field, physical.downcast::().unwrap().clone()) .into_series(), From 88edd4621d2f94bb7afeffa3d5e5df79bd416041 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Mon, 23 Oct 2023 22:24:01 -0700 Subject: [PATCH 5/6] use lazy iterator --- src/daft-core/src/array/serdes.rs | 48 ++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/src/daft-core/src/array/serdes.rs b/src/daft-core/src/array/serdes.rs index 1911f00ff0..7541b6ff36 100644 --- a/src/daft-core/src/array/serdes.rs +++ b/src/daft-core/src/array/serdes.rs @@ -1,3 +1,5 @@ +use std::cell::RefCell; + use serde::ser::SerializeMap; use crate::{ @@ -13,6 +15,44 @@ use crate::datatypes::PythonArray; use super::{ops::as_arrow::AsArrow, DataArray, FixedSizeListArray, ListArray, StructArray}; +// adapted from Polars Serdes iterator +pub struct IterSer +where + I: IntoIterator, + ::Item: serde::Serialize, +{ + iter: RefCell>, +} + +impl IterSer +where + I: IntoIterator, + ::Item: serde::Serialize, +{ + fn new(iter: I) -> Self { + IterSer { + iter: RefCell::new(Some(iter)), + } + } +} + +impl serde::Serialize for IterSer +where + I: IntoIterator, + ::Item: serde::Serialize, +{ + fn serialize( + &self, + serializer: S, + ) -> std::result::Result<::Ok, ::Error> + where + S: serde::Serializer, + { + let iter: I = self.iter.borrow_mut().take().unwrap(); + serializer.collect_seq(iter) + } +} + impl serde::Serialize for DataArray { fn serialize(&self, serializer: S) -> Result where @@ -20,7 +60,7 @@ impl serde::Serialize for DataArray { { let mut s = serializer.serialize_map(Some(2))?; s.serialize_entry("field", self.field())?; - s.serialize_entry("values", &self.as_arrow().iter().collect::>())?; + s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?; s.end() } } @@ -32,7 +72,7 @@ impl serde::Serialize for Utf8Array { { let mut s = serializer.serialize_map(Some(2))?; s.serialize_entry("field", self.field())?; - s.serialize_entry("values", &self.as_arrow().iter().collect::>())?; + s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?; s.end() } } @@ -44,7 +84,7 @@ impl serde::Serialize for BooleanArray { { let mut s = serializer.serialize_map(Some(2))?; s.serialize_entry("field", self.field())?; - s.serialize_entry("values", &self.as_arrow().iter().collect::>())?; + s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?; s.end() } } @@ -56,7 +96,7 @@ impl serde::Serialize for BinaryArray { { let mut s = serializer.serialize_map(Some(2))?; s.serialize_entry("field", self.field())?; - s.serialize_entry("values", &self.as_arrow().iter().collect::>())?; + s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?; s.end() } } From 37e3127845a52eb1c0e7839697ec8d40d109ff72 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Mon, 23 Oct 2023 22:25:19 -0700 Subject: [PATCH 6/6] add null test --- tests/series/test_series.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/series/test_series.py b/tests/series/test_series.py index 962b192763..a56034b6fa 100644 --- a/tests/series/test_series.py +++ b/tests/series/test_series.py @@ -182,3 +182,13 @@ def test_series_bincode_serdes_on_complex_types() -> None: assert s.name() == copied_s.name() assert s.datatype() == copied_s.datatype() assert all(np.all(l == r) for l, r in zip(s.to_pylist(), copied_s.to_pylist())) + + +def test_series_bincode_serdes_on_null_types() -> None: + s = Series.from_pylist([None, None, None]) + serialized = s._debug_bincode_serialize() + copied_s = Series._debug_bincode_deserialize(serialized) + + assert s.name() == copied_s.name() + assert s.datatype() == copied_s.datatype() + assert s.to_pylist() == copied_s.to_pylist()