diff --git a/daft/daft.pyi b/daft/daft.pyi index e0a3b644a3..27d0ff50ff 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -665,6 +665,9 @@ class PySeries: def image_resize(self, w: int, h: int) -> PySeries: ... def if_else(self, other: PySeries, predicate: PySeries) -> PySeries: ... def is_null(self) -> PySeries: ... + def _debug_bincode_serialize(self) -> bytes: ... + @staticmethod + def _debug_bincode_deserialize(b: bytes) -> PySeries: ... class PyTable: def schema(self) -> PySchema: ... diff --git a/daft/series.py b/daft/series.py index ece133c060..a450367a8e 100644 --- a/daft/series.py +++ b/daft/series.py @@ -527,6 +527,13 @@ def _from_arrow_table_to_series(cls, table: pa.Table, extension_type: pa.Extensi array = extension_type.wrap_array(array) return cls.from_arrow(array, name) + def _debug_bincode_serialize(self) -> bytes: + return self._series._debug_bincode_serialize() + + @classmethod + def _debug_bincode_deserialize(cls, b: bytes) -> Series: + return Series._from_pyseries(PySeries._debug_bincode_deserialize(b)) + SomeSeriesNamespace = TypeVar("SomeSeriesNamespace", bound="SeriesNamespace") diff --git a/src/daft-core/src/array/boolean.rs b/src/daft-core/src/array/boolean.rs new file mode 100644 index 0000000000..7d53c076da --- /dev/null +++ b/src/daft-core/src/array/boolean.rs @@ -0,0 +1,9 @@ +use crate::datatypes::BooleanArray; + +use super::ops::as_arrow::AsArrow; + +impl BooleanArray { + pub fn as_bitmap(&self) -> &arrow2::bitmap::Bitmap { + self.as_arrow().values() + } +} diff --git a/src/daft-core/src/array/from.rs b/src/daft-core/src/array/from.rs index e51b127471..9d6de70a6a 100644 --- a/src/daft-core/src/array/from.rs +++ b/src/daft-core/src/array/from.rs @@ -6,6 +6,7 @@ use crate::datatypes::{ }; use crate::array::DataArray; + use common_error::{DaftError, DaftResult}; impl From<(&str, Box>)> @@ -63,6 +64,16 @@ impl From<(&str, &[bool])> for BooleanArray { } } +impl From<(&str, &[Option])> for BooleanArray { + fn from(item: (&str, &[Option])) -> Self { + let (name, slice) = item; + let arrow_array = Box::new(arrow2::array::BooleanArray::from_trusted_len_iter( + slice.iter().cloned(), + )); + DataArray::new(Field::new(name, DataType::Boolean).into(), arrow_array).unwrap() + } +} + impl From<(&str, arrow2::array::BooleanArray)> for BooleanArray { fn from(item: (&str, arrow2::array::BooleanArray)) -> Self { let (name, arrow_array) = item; @@ -74,6 +85,21 @@ impl From<(&str, arrow2::array::BooleanArray)> for BooleanArray { } } +impl From<(&str, arrow2::bitmap::Bitmap)> for BooleanArray { + fn from(item: (&str, arrow2::bitmap::Bitmap)) -> Self { + let (name, bitmap) = item; + DataArray::new( + Field::new(name, DataType::Boolean).into(), + Box::new(arrow2::array::BooleanArray::new( + arrow2::datatypes::DataType::Boolean, + bitmap, + None, + )), + ) + .unwrap() + } +} + impl From<(&str, Box)> for BooleanArray { fn from(item: (&str, Box)) -> Self { let (name, arrow_array) = item; diff --git a/src/daft-core/src/array/from_iter.rs b/src/daft-core/src/array/from_iter.rs new file mode 100644 index 0000000000..7fb3b5ad21 --- /dev/null +++ b/src/daft-core/src/array/from_iter.rs @@ -0,0 +1,43 @@ +use crate::datatypes::{BinaryArray, DaftNumericType, Field, Utf8Array}; + +use super::DataArray; + +impl DataArray +where + T: DaftNumericType, +{ + pub fn from_iter( + name: &str, + iter: impl Iterator> + arrow2::trusted_len::TrustedLen, + ) -> Self { + let arrow_array = + Box::new(arrow2::array::PrimitiveArray::::from_trusted_len_iter(iter)); + DataArray::new(Field::new(name, T::get_dtype()).into(), arrow_array).unwrap() + } +} + +impl Utf8Array { + pub fn from_iter>( + name: &str, + iter: impl Iterator> + arrow2::trusted_len::TrustedLen, + ) -> Self { + let arrow_array = Box::new(arrow2::array::Utf8Array::::from_trusted_len_iter(iter)); + DataArray::new(Field::new(name, crate::DataType::Utf8).into(), arrow_array).unwrap() + } +} + +impl BinaryArray { + pub fn from_iter>( + name: &str, + iter: impl Iterator> + arrow2::trusted_len::TrustedLen, + ) -> Self { + let arrow_array = Box::new(arrow2::array::BinaryArray::::from_trusted_len_iter( + iter, + )); + DataArray::new( + Field::new(name, crate::DataType::Binary).into(), + arrow_array, + ) + .unwrap() + } +} diff --git a/src/daft-core/src/array/mod.rs b/src/daft-core/src/array/mod.rs index 2b23671987..d38769eb6d 100644 --- a/src/daft-core/src/array/mod.rs +++ b/src/daft-core/src/array/mod.rs @@ -1,15 +1,18 @@ +mod fixed_size_list_array; pub mod from; pub mod growable; pub mod iterator; +mod list_array; pub mod ops; pub mod pseudo_arrow; - -mod fixed_size_list_array; -mod list_array; +mod serdes; mod struct_array; pub use fixed_size_list_array::FixedSizeListArray; pub use list_array::ListArray; + pub use struct_array::StructArray; +mod boolean; +mod from_iter; use std::{marker::PhantomData, sync::Arc}; diff --git a/src/daft-core/src/array/serdes.rs b/src/daft-core/src/array/serdes.rs new file mode 100644 index 0000000000..7541b6ff36 --- /dev/null +++ b/src/daft-core/src/array/serdes.rs @@ -0,0 +1,224 @@ +use std::cell::RefCell; + +use serde::ser::SerializeMap; + +use crate::{ + datatypes::{ + logical::LogicalArray, BinaryArray, BooleanArray, DaftLogicalType, DaftNumericType, + ExtensionArray, Int64Array, NullArray, Utf8Array, + }, + DataType, IntoSeries, Series, +}; + +#[cfg(feature = "python")] +use crate::datatypes::PythonArray; + +use super::{ops::as_arrow::AsArrow, DataArray, FixedSizeListArray, ListArray, StructArray}; + +// adapted from Polars Serdes iterator +pub struct IterSer +where + I: IntoIterator, + ::Item: serde::Serialize, +{ + iter: RefCell>, +} + +impl IterSer +where + I: IntoIterator, + ::Item: serde::Serialize, +{ + fn new(iter: I) -> Self { + IterSer { + iter: RefCell::new(Some(iter)), + } + } +} + +impl serde::Serialize for IterSer +where + I: IntoIterator, + ::Item: serde::Serialize, +{ + fn serialize( + &self, + serializer: S, + ) -> std::result::Result<::Ok, ::Error> + where + S: serde::Serializer, + { + let iter: I = self.iter.borrow_mut().take().unwrap(); + serializer.collect_seq(iter) + } +} + +impl serde::Serialize for DataArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?; + s.end() + } +} + +impl serde::Serialize for Utf8Array { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?; + s.end() + } +} + +impl serde::Serialize for BooleanArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?; + s.end() + } +} + +impl serde::Serialize for BinaryArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?; + s.end() + } +} + +impl serde::Serialize for NullArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + s.serialize_entry("values", &self.len())?; + s.end() + } +} + +impl serde::Serialize for ExtensionArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field())?; + let values = if let DataType::Extension(_, inner, _) = self.data_type() { + Series::try_from(("physical", self.data.to_type(inner.to_arrow().unwrap()))).unwrap() + } else { + panic!("Expected Extension Type!") + }; + s.serialize_entry("values", &values)?; + s.end() + } +} + +#[cfg(feature = "python")] +impl serde::Serialize for PythonArray { + fn serialize(&self, _serializer: S) -> Result + where + S: serde::Serializer, + { + panic!("Rust Serde is not implemented for Python Arrays") + } +} + +impl serde::Serialize for StructArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + + let mut values = Vec::with_capacity(self.children.len() + 1); + values.extend(self.children.iter().map(Some)); + + let validity = self + .validity() + .map(|b| BooleanArray::from(("validity", b.clone())).into_series()); + values.push(validity.as_ref()); + + s.serialize_entry("field", self.field.as_ref())?; + s.serialize_entry("values", &values)?; + s.end() + } +} + +impl serde::Serialize for ListArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + let mut values = Vec::with_capacity(3); + + values.push(Some(&self.flat_child)); + + let arrow2_offsets = arrow2::array::Int64Array::new( + arrow2::datatypes::DataType::Int64, + self.offsets().buffer().clone(), + None, + ); + let offsets = Int64Array::from(("offsets", Box::new(arrow2_offsets))).into_series(); + values.push(Some(&offsets)); + + let validity = self + .validity() + .map(|b| BooleanArray::from(("validity", b.clone())).into_series()); + values.push(validity.as_ref()); + + s.serialize_entry("field", self.field.as_ref())?; + s.serialize_entry("values", &values)?; + s.end() + } +} + +impl serde::Serialize for FixedSizeListArray { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + + let validity = self + .validity() + .map(|b| BooleanArray::from(("validity", b.clone())).into_series()); + let values = vec![Some(&self.flat_child), validity.as_ref()]; + s.serialize_entry("field", self.field.as_ref())?; + s.serialize_entry("values", &values)?; + s.end() + } +} + +impl serde::Serialize for LogicalArray +where + <::PhysicalType as crate::datatypes::DaftDataType>::ArrayType: + serde::Serialize + IntoSeries, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_map(Some(2))?; + s.serialize_entry("field", self.field.as_ref())?; + s.serialize_entry("values", &self.physical.clone().into_series())?; + s.end() + } +} diff --git a/src/daft-core/src/datatypes/mod.rs b/src/daft-core/src/datatypes/mod.rs index 305877b860..9d1d614131 100644 --- a/src/daft-core/src/datatypes/mod.rs +++ b/src/daft-core/src/datatypes/mod.rs @@ -6,8 +6,6 @@ mod image_mode; mod matching; mod time_unit; -use std::ops::{Add, Div, Mul, Rem, Sub}; - pub use crate::array::{DataArray, FixedSizeListArray}; use crate::array::{ListArray, StructArray}; use arrow2::{ @@ -21,6 +19,8 @@ pub use field::FieldID; pub use image_format::ImageFormat; pub use image_mode::ImageMode; use num_traits::{Bounded, Float, FromPrimitive, Num, NumCast, ToPrimitive, Zero}; +use serde::Serialize; +use std::ops::{Add, Div, Mul, Rem, Sub}; pub use time_unit::TimeUnit; pub mod logical; @@ -201,6 +201,7 @@ pub trait NumericNative: + Bounded + FromPrimitive + ToPrimitive + + Serialize { type DAFTTYPE: DaftNumericType; } diff --git a/src/daft-core/src/python/series.rs b/src/daft-core/src/python/series.rs index f3c8cb3fa0..3c54a0ee04 100644 --- a/src/daft-core/src/python/series.rs +++ b/src/daft-core/src/python/series.rs @@ -1,6 +1,11 @@ use std::ops::{Add, Div, Mul, Rem, Sub}; -use pyo3::{exceptions::PyValueError, prelude::*, pyclass::CompareOp, types::PyList}; +use pyo3::{ + exceptions::PyValueError, + prelude::*, + pyclass::CompareOp, + types::{PyBytes, PyList}, +}; use crate::{ array::{ops::DaftLogical, pseudo_arrow::PseudoArrowArray, DataArray}, @@ -316,6 +321,17 @@ impl PySeries { pub fn is_null(&self) -> PyResult { Ok(self.series.is_null()?.into()) } + + pub fn _debug_bincode_serialize(&self, py: Python) -> PyResult { + let values = bincode::serialize(&self.series).unwrap(); + Ok(PyBytes::new(py, &values).to_object(py)) + } + + #[staticmethod] + pub fn _debug_bincode_deserialize(bytes: &PyBytes) -> PyResult { + let values = bincode::deserialize::(bytes.as_bytes()).unwrap(); + Ok(Self { series: values }) + } } impl From for PySeries { diff --git a/src/daft-core/src/series/mod.rs b/src/daft-core/src/series/mod.rs index 27be057048..40cfe5b5f2 100644 --- a/src/daft-core/src/series/mod.rs +++ b/src/daft-core/src/series/mod.rs @@ -1,8 +1,8 @@ mod array_impl; mod from; mod ops; +mod serdes; mod series_like; - use std::{ fmt::{Display, Formatter, Result}, sync::Arc, diff --git a/src/daft-core/src/series/serdes.rs b/src/daft-core/src/series/serdes.rs new file mode 100644 index 0000000000..8d9370a7cf --- /dev/null +++ b/src/daft-core/src/series/serdes.rs @@ -0,0 +1,296 @@ +use std::{borrow::Cow, sync::Arc}; + +use arrow2::offset::OffsetsBuffer; +use serde::{de::Visitor, Deserializer}; + +use crate::{ + array::{ + ops::{as_arrow::AsArrow, full::FullNull}, + ListArray, StructArray, + }, + datatypes::logical::{ + DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray, + FixedShapeTensorArray, ImageArray, TensorArray, TimestampArray, + }, + with_match_daft_types, DataType, IntoSeries, Series, +}; + +impl serde::Serialize for Series { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + with_match_daft_types!(self.data_type(), |$T| { + let array = self.downcast::<<$T as DaftDataType>::ArrayType>().unwrap(); + array.serialize(serializer) + }) + } +} + +impl<'d> serde::Deserialize<'d> for Series { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'d>, + { + const EXPECTED_KEYS: &[&str] = &["field", "values"]; + + struct SeriesVisitor; + + impl<'d> Visitor<'d> for SeriesVisitor { + type Value = Series; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("struct {field: , values: }") + } + + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'d>, + { + let mut field: Option = None; + let mut values_set = false; + while let Some(key) = map.next_key::>()? { + match key.as_ref() { + "field" => { + field = Some(map.next_value()?); + } + "values" => { + values_set = true; + break; + } + unknown => { + return Err(serde::de::Error::unknown_field(unknown, EXPECTED_KEYS)) + } + } + } + if !values_set { + return Err(serde::de::Error::missing_field("values")); + } + let field = field.ok_or_else(|| serde::de::Error::missing_field("name"))?; + use crate::datatypes::*; + use DataType::*; + match &field.dtype { + Null => Ok(NullArray::full_null( + &field.name, + &field.dtype, + map.next_value::()?, + ) + .into_series()), + Boolean => Ok(BooleanArray::from(( + field.name.as_str(), + map.next_value::>>()?.as_slice(), + )) + .into_series()), + Int8 => Ok(Int8Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Int16 => Ok(Int16Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Int32 => Ok(Int32Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Int64 => Ok(Int64Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Int128 => Ok(Int128Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + UInt8 => Ok(UInt8Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + UInt16 => Ok(UInt16Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + UInt32 => Ok(UInt32Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + UInt64 => Ok(UInt64Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Float32 => Ok(Float32Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Float64 => Ok(Float64Array::from_iter( + field.name.as_str(), + map.next_value::>>()?.into_iter(), + ) + .into_series()), + Utf8 => Ok(Utf8Array::from_iter( + field.name.as_str(), + map.next_value::>>>()?.into_iter(), + ) + .into_series()), + Binary => Ok(BinaryArray::from_iter( + field.name.as_str(), + map.next_value::>>>()?.into_iter(), + ) + .into_series()), + Extension(..) => { + let physical = map.next_value::()?; + let physical = physical.to_arrow(); + let ext_array = physical.to_type(field.dtype.to_arrow().unwrap()); + Ok(ExtensionArray::new(Arc::new(field), ext_array) + .unwrap() + .into_series()) + } + Struct(..) => { + let mut all_series = map.next_value::>>()?; + let validity = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("validity"))?; + let children = all_series + .into_iter() + .map(|s| s.unwrap()) + .collect::>(); + + let validity = validity.map(|v| v.bool().unwrap().as_bitmap().clone()); + Ok(StructArray::new(Arc::new(field), children, validity).into_series()) + } + List(..) => { + let mut all_series = map.next_value::>>()?; + let validity = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("validity"))?; + let validity = validity.map(|v| v.bool().unwrap().as_bitmap().clone()); + let offsets_series = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("offsets"))? + .unwrap(); + let offsets_array = offsets_series.i64().unwrap(); + let offsets = OffsetsBuffer::::try_from( + offsets_array.as_arrow().values().clone(), + ) + .unwrap(); + let flat_child = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("flat_child"))? + .unwrap(); + Ok(ListArray::new(field, flat_child, offsets, validity).into_series()) + } + FixedSizeList(..) => { + let mut all_series = map.next_value::>>()?; + let validity = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("validity"))?; + let flat_child = all_series + .pop() + .ok_or_else(|| serde::de::Error::missing_field("flat_child"))? + .unwrap(); + + let validity = validity.map(|v| v.bool().unwrap().as_bitmap().clone()); + Ok(FixedSizeListArray::new(field, flat_child, validity).into_series()) + } + Decimal128(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::()?; + Ok(Decimal128Array::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series()) + } + Timestamp(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::()?; + Ok(TimestampArray::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series()) + } + Date => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::()?; + Ok( + DateArray::new(field, physical.downcast::().unwrap().clone()) + .into_series(), + ) + } + Time(..) => panic!("Time Deserialization not implemented"), + Duration(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::()?; + Ok( + DurationArray::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series(), + ) + } + Embedding(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::()?; + Ok(EmbeddingArray::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series()) + } + Image(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::()?; + Ok( + ImageArray::new(field, physical.downcast::().unwrap().clone()) + .into_series(), + ) + } + FixedShapeImage(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::()?; + Ok(FixedShapeImageArray::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series()) + } + FixedShapeTensor(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::()?; + Ok(FixedShapeTensorArray::new( + field, + physical.downcast::().unwrap().clone(), + ) + .into_series()) + } + Tensor(..) => { + type PType = <::PhysicalType as DaftDataType>::ArrayType; + let physical = map.next_value::()?; + Ok( + TensorArray::new(field, physical.downcast::().unwrap().clone()) + .into_series(), + ) + } + Python => { + panic!("python deserialization not implemented for rust Serde"); + } + Unknown => { + panic!("Unable to deserialize Unknown DataType"); + } + } + } + } + deserializer.deserialize_map(SeriesVisitor) + } +} diff --git a/src/daft-csv/src/metadata.rs b/src/daft-csv/src/metadata.rs index 0e4fcd7b33..3a67af421a 100644 --- a/src/daft-csv/src/metadata.rs +++ b/src/daft-csv/src/metadata.rs @@ -327,8 +327,7 @@ mod tests { Field::new("petal.length", DataType::Float64), Field::new("petal.width", DataType::Float64), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 328); assert_eq!(num_records_read, 20); @@ -363,8 +362,7 @@ mod tests { Field::new("petal.length", DataType::Float64), Field::new("petal.width", DataType::Float64), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 328); assert_eq!(num_records_read, 20); @@ -409,8 +407,7 @@ mod tests { Field::new("column_3", DataType::Float64), Field::new("column_4", DataType::Float64), Field::new("column_5", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 328); assert_eq!(num_records_read, 20); @@ -439,8 +436,7 @@ mod tests { Field::new("petal.length", DataType::Float64), Field::new("petal.width", DataType::Float64), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 49); assert_eq!(num_records_read, 3); @@ -466,8 +462,7 @@ mod tests { Field::new("petal.length", DataType::Float64), Field::new("petal.width", DataType::Float64), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 82); assert_eq!(num_records_read, 6); @@ -497,8 +492,7 @@ mod tests { Field::new("petal.length", DataType::Utf8), Field::new("petal.width", DataType::Utf8), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); assert_eq!(total_bytes_read, 33); assert_eq!(num_records_read, 2); @@ -530,8 +524,7 @@ mod tests { Field::new("petal.length", DataType::Float64), Field::new("petal.width", DataType::Float64), Field::new("variety", DataType::Utf8), - ])? - .into(), + ])?, ); // Max bytes doesn't include header, so add 15 bytes to upper bound. assert!(total_bytes_read <= 100 + 15, "{}", total_bytes_read); diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 693a2d0cdb..303b07423d 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -462,19 +462,13 @@ mod tests { .unwrap(); let (mut fields, _) = infer_schema(&mut reader, None, has_header, &infer).unwrap(); if !has_header && let Some(column_names) = column_names { - fields = fields.into_iter().zip(column_names.into_iter()).map(|(field, name)| arrow2::datatypes::Field::new(name, field.data_type, true).with_metadata(field.metadata)).collect::>(); + fields = fields.into_iter().zip(column_names).map(|(field, name)| arrow2::datatypes::Field::new(name, field.data_type, true).with_metadata(field.metadata)).collect::>(); } let mut rows = vec![ByteRecord::default(); limit.unwrap_or(100)]; let rows_read = read_rows(&mut reader, 0, &mut rows).unwrap(); let rows = &rows[..rows_read]; - let chunk = deserialize_batch( - rows, - &fields, - projection.as_ref().map(|p| p.as_slice()), - 0, - deserialize_column, - ) - .unwrap(); + let chunk = + deserialize_batch(rows, &fields, projection.as_deref(), 0, deserialize_column).unwrap(); if let Some(projection) = projection { fields = projection .into_iter() @@ -1216,7 +1210,7 @@ mod tests { let column_names = vec!["a", "b"]; let table = read_csv( - file.as_ref(), + file, Some(column_names.clone()), None, None, @@ -1254,7 +1248,7 @@ mod tests { let column_names = vec!["a", "b"]; let table = read_csv( - file.as_ref(), + file, Some(column_names.clone()), Some(vec!["b"]), None, diff --git a/tests/series/test_series.py b/tests/series/test_series.py index 7104ba891f..a56034b6fa 100644 --- a/tests/series/test_series.py +++ b/tests/series/test_series.py @@ -2,7 +2,9 @@ import copy from collections import Counter +from datetime import date, datetime +import numpy as np import pyarrow as pa import pytest @@ -128,3 +130,65 @@ def test_series_pickling(dtype) -> None: copied_s = copy.deepcopy(s) assert s.datatype() == copied_s.datatype() assert s.to_pylist() == copied_s.to_pylist() + + +@pytest.mark.parametrize("dtype", ARROW_FLOAT_TYPES + ARROW_INT_TYPES + ARROW_STRING_TYPES) +def test_series_bincode_serdes(dtype) -> None: + s = Series.from_pylist([1, 2, 3, None]).cast(DataType.from_arrow_type(dtype)) + serialized = s._debug_bincode_serialize() + copied_s = Series._debug_bincode_deserialize(serialized) + + assert s.name() == copied_s.name() + assert s.datatype() == copied_s.datatype() + assert s.to_pylist() == copied_s.to_pylist() + + +@pytest.mark.parametrize( + "data", + [ + [{"a": "foo", "b": "bar"}, None, {"b": "baz", "c": "quux"}], + [[1, 2, 3], None, [4, 5]], + [datetime.now(), None, datetime.now()], + [date.today(), date.today(), None], + ], +) +def test_series_bincode_serdes_on_data(data) -> None: + s = Series.from_arrow(pa.array(data)) + serialized = s._debug_bincode_serialize() + copied_s = Series._debug_bincode_deserialize(serialized) + + assert s.name() == copied_s.name() + assert s.datatype() == copied_s.datatype() + assert s.to_pylist() == copied_s.to_pylist() + + +def test_series_bincode_serdes_on_ext_type(uuid_ext_type) -> None: + arr = pa.array(f"{i}".encode() for i in range(5)) + data = pa.ExtensionArray.from_storage(uuid_ext_type, arr) + s = Series.from_arrow(data) + serialized = s._debug_bincode_serialize() + copied_s = Series._debug_bincode_deserialize(serialized) + + assert s.name() == copied_s.name() + assert s.datatype() == copied_s.datatype() + assert s.to_pylist() == copied_s.to_pylist() + + +def test_series_bincode_serdes_on_complex_types() -> None: + s = Series.from_pylist([np.ones((4, 5)), np.ones((2, 10))]) + serialized = s._debug_bincode_serialize() + copied_s = Series._debug_bincode_deserialize(serialized) + + assert s.name() == copied_s.name() + assert s.datatype() == copied_s.datatype() + assert all(np.all(l == r) for l, r in zip(s.to_pylist(), copied_s.to_pylist())) + + +def test_series_bincode_serdes_on_null_types() -> None: + s = Series.from_pylist([None, None, None]) + serialized = s._debug_bincode_serialize() + copied_s = Series._debug_bincode_deserialize(serialized) + + assert s.name() == copied_s.name() + assert s.datatype() == copied_s.datatype() + assert s.to_pylist() == copied_s.to_pylist()