Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] implement series serde #1519

Merged
merged 6 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,9 @@ class PySeries:
def image_resize(self, w: int, h: int) -> PySeries: ...
def if_else(self, other: PySeries, predicate: PySeries) -> PySeries: ...
def is_null(self) -> PySeries: ...
def _debug_bincode_serialize(self) -> bytes: ...
@staticmethod
def _debug_bincode_deserialize(b: bytes) -> PySeries: ...

class PyTable:
def schema(self) -> PySchema: ...
Expand Down
7 changes: 7 additions & 0 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,13 @@ def _from_arrow_table_to_series(cls, table: pa.Table, extension_type: pa.Extensi
array = extension_type.wrap_array(array)
return cls.from_arrow(array, name)

def _debug_bincode_serialize(self) -> bytes:
return self._series._debug_bincode_serialize()

@classmethod
def _debug_bincode_deserialize(cls, b: bytes) -> Series:
return Series._from_pyseries(PySeries._debug_bincode_deserialize(b))


SomeSeriesNamespace = TypeVar("SomeSeriesNamespace", bound="SeriesNamespace")

Expand Down
9 changes: 9 additions & 0 deletions src/daft-core/src/array/boolean.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::datatypes::BooleanArray;

use super::ops::as_arrow::AsArrow;

impl BooleanArray {
pub fn as_bitmap(&self) -> &arrow2::bitmap::Bitmap {
self.as_arrow().values()
}
}
26 changes: 26 additions & 0 deletions src/daft-core/src/array/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::datatypes::{
};

use crate::array::DataArray;

use common_error::{DaftError, DaftResult};

impl<T: DaftNumericType> From<(&str, Box<arrow2::array::PrimitiveArray<T::Native>>)>
Expand Down Expand Up @@ -63,6 +64,16 @@ impl From<(&str, &[bool])> for BooleanArray {
}
}

impl From<(&str, &[Option<bool>])> for BooleanArray {
fn from(item: (&str, &[Option<bool>])) -> Self {
let (name, slice) = item;
let arrow_array = Box::new(arrow2::array::BooleanArray::from_trusted_len_iter(
slice.iter().cloned(),
));
DataArray::new(Field::new(name, DataType::Boolean).into(), arrow_array).unwrap()
}
}

impl From<(&str, arrow2::array::BooleanArray)> for BooleanArray {
fn from(item: (&str, arrow2::array::BooleanArray)) -> Self {
let (name, arrow_array) = item;
Expand All @@ -74,6 +85,21 @@ impl From<(&str, arrow2::array::BooleanArray)> for BooleanArray {
}
}

impl From<(&str, arrow2::bitmap::Bitmap)> for BooleanArray {
fn from(item: (&str, arrow2::bitmap::Bitmap)) -> Self {
let (name, bitmap) = item;
DataArray::new(
Field::new(name, DataType::Boolean).into(),
Box::new(arrow2::array::BooleanArray::new(
arrow2::datatypes::DataType::Boolean,
bitmap,
None,
)),
)
.unwrap()
}
}

impl From<(&str, Box<arrow2::array::BooleanArray>)> for BooleanArray {
fn from(item: (&str, Box<arrow2::array::BooleanArray>)) -> Self {
let (name, arrow_array) = item;
Expand Down
43 changes: 43 additions & 0 deletions src/daft-core/src/array/from_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::datatypes::{BinaryArray, DaftNumericType, Field, Utf8Array};

use super::DataArray;

impl<T> DataArray<T>
where
T: DaftNumericType,
{
pub fn from_iter(
name: &str,
iter: impl Iterator<Item = Option<T::Native>> + arrow2::trusted_len::TrustedLen,
) -> Self {
let arrow_array =
Box::new(arrow2::array::PrimitiveArray::<T::Native>::from_trusted_len_iter(iter));
DataArray::new(Field::new(name, T::get_dtype()).into(), arrow_array).unwrap()
}
}

impl Utf8Array {
pub fn from_iter<S: AsRef<str>>(
name: &str,
iter: impl Iterator<Item = Option<S>> + arrow2::trusted_len::TrustedLen,
) -> Self {
let arrow_array = Box::new(arrow2::array::Utf8Array::<i64>::from_trusted_len_iter(iter));
DataArray::new(Field::new(name, crate::DataType::Utf8).into(), arrow_array).unwrap()
}
}

impl BinaryArray {
pub fn from_iter<S: AsRef<[u8]>>(
name: &str,
iter: impl Iterator<Item = Option<S>> + arrow2::trusted_len::TrustedLen,
) -> Self {
let arrow_array = Box::new(arrow2::array::BinaryArray::<i64>::from_trusted_len_iter(
iter,
));
DataArray::new(
Field::new(name, crate::DataType::Binary).into(),
arrow_array,
)
.unwrap()
}
}
9 changes: 6 additions & 3 deletions src/daft-core/src/array/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
mod fixed_size_list_array;
pub mod from;
pub mod growable;
pub mod iterator;
mod list_array;
pub mod ops;
pub mod pseudo_arrow;

mod fixed_size_list_array;
mod list_array;
mod serdes;
mod struct_array;
pub use fixed_size_list_array::FixedSizeListArray;
pub use list_array::ListArray;

pub use struct_array::StructArray;
mod boolean;
mod from_iter;

use std::{marker::PhantomData, sync::Arc};

Expand Down
224 changes: 224 additions & 0 deletions src/daft-core/src/array/serdes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
use std::cell::RefCell;

use serde::ser::SerializeMap;

use crate::{
datatypes::{
logical::LogicalArray, BinaryArray, BooleanArray, DaftLogicalType, DaftNumericType,
ExtensionArray, Int64Array, NullArray, Utf8Array,
},
DataType, IntoSeries, Series,
};

#[cfg(feature = "python")]
use crate::datatypes::PythonArray;

use super::{ops::as_arrow::AsArrow, DataArray, FixedSizeListArray, ListArray, StructArray};

// adapted from Polars Serdes iterator
pub struct IterSer<I>
where
I: IntoIterator,
<I as IntoIterator>::Item: serde::Serialize,
{
iter: RefCell<Option<I>>,
}

impl<I> IterSer<I>
where
I: IntoIterator,
<I as IntoIterator>::Item: serde::Serialize,
{
fn new(iter: I) -> Self {
IterSer {
iter: RefCell::new(Some(iter)),
}
}
}

impl<I> serde::Serialize for IterSer<I>
where
I: IntoIterator,
<I as IntoIterator>::Item: serde::Serialize,
{
fn serialize<S>(
&self,
serializer: S,
) -> std::result::Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
where
S: serde::Serializer,
{
let iter: I = self.iter.borrow_mut().take().unwrap();
serializer.collect_seq(iter)
}
}

impl<T: DaftNumericType> serde::Serialize for DataArray<T> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(Some(2))?;
s.serialize_entry("field", self.field())?;
s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?;
s.end()
}
}

impl serde::Serialize for Utf8Array {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(Some(2))?;
s.serialize_entry("field", self.field())?;
s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?;
s.end()
}
}

impl serde::Serialize for BooleanArray {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(Some(2))?;
s.serialize_entry("field", self.field())?;
s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?;
s.end()
}
}

impl serde::Serialize for BinaryArray {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(Some(2))?;
s.serialize_entry("field", self.field())?;
s.serialize_entry("values", &IterSer::new(self.as_arrow().iter()))?;
s.end()
}
}

impl serde::Serialize for NullArray {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(Some(2))?;
s.serialize_entry("field", self.field())?;
s.serialize_entry("values", &self.len())?;
s.end()
}
}

impl serde::Serialize for ExtensionArray {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(Some(2))?;
s.serialize_entry("field", self.field())?;
let values = if let DataType::Extension(_, inner, _) = self.data_type() {
Series::try_from(("physical", self.data.to_type(inner.to_arrow().unwrap()))).unwrap()
} else {
panic!("Expected Extension Type!")
};
s.serialize_entry("values", &values)?;
s.end()
}
}

#[cfg(feature = "python")]
impl serde::Serialize for PythonArray {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
panic!("Rust Serde is not implemented for Python Arrays")
}
}

impl serde::Serialize for StructArray {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(Some(2))?;

let mut values = Vec::with_capacity(self.children.len() + 1);
values.extend(self.children.iter().map(Some));

let validity = self
.validity()
.map(|b| BooleanArray::from(("validity", b.clone())).into_series());
values.push(validity.as_ref());

s.serialize_entry("field", self.field.as_ref())?;
s.serialize_entry("values", &values)?;
s.end()
}
}

impl serde::Serialize for ListArray {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(Some(2))?;
let mut values = Vec::with_capacity(3);

values.push(Some(&self.flat_child));

let arrow2_offsets = arrow2::array::Int64Array::new(
arrow2::datatypes::DataType::Int64,
self.offsets().buffer().clone(),
None,
);
let offsets = Int64Array::from(("offsets", Box::new(arrow2_offsets))).into_series();
values.push(Some(&offsets));

let validity = self
.validity()
.map(|b| BooleanArray::from(("validity", b.clone())).into_series());
values.push(validity.as_ref());

s.serialize_entry("field", self.field.as_ref())?;
s.serialize_entry("values", &values)?;
s.end()
}
}

impl serde::Serialize for FixedSizeListArray {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(Some(2))?;

let validity = self
.validity()
.map(|b| BooleanArray::from(("validity", b.clone())).into_series());
let values = vec![Some(&self.flat_child), validity.as_ref()];
s.serialize_entry("field", self.field.as_ref())?;
s.serialize_entry("values", &values)?;
s.end()
}
}

impl<L: DaftLogicalType> serde::Serialize for LogicalArray<L>
where
<<L as DaftLogicalType>::PhysicalType as crate::datatypes::DaftDataType>::ArrayType:
serde::Serialize + IntoSeries,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(Some(2))?;
s.serialize_entry("field", self.field.as_ref())?;
s.serialize_entry("values", &self.physical.clone().into_series())?;
s.end()
}
}
5 changes: 3 additions & 2 deletions src/daft-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ mod image_mode;
mod matching;
mod time_unit;

use std::ops::{Add, Div, Mul, Rem, Sub};

pub use crate::array::{DataArray, FixedSizeListArray};
use crate::array::{ListArray, StructArray};
use arrow2::{
Expand All @@ -21,6 +19,8 @@ pub use field::FieldID;
pub use image_format::ImageFormat;
pub use image_mode::ImageMode;
use num_traits::{Bounded, Float, FromPrimitive, Num, NumCast, ToPrimitive, Zero};
use serde::Serialize;
use std::ops::{Add, Div, Mul, Rem, Sub};
pub use time_unit::TimeUnit;

pub mod logical;
Expand Down Expand Up @@ -201,6 +201,7 @@ pub trait NumericNative:
+ Bounded
+ FromPrimitive
+ ToPrimitive
+ Serialize
{
type DAFTTYPE: DaftNumericType;
}
Expand Down
Loading
Loading