Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Not_Null Expression #1777

Merged
merged 2 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ class PyExpr:
def __eq__(self, other: PyExpr) -> PyExpr: ... # type: ignore[override]
def __ne__(self, other: PyExpr) -> PyExpr: ... # type: ignore[override]
def is_null(self) -> PyExpr: ...
def not_null(self) -> PyExpr: ...
def name(self) -> str: ...
def to_field(self, schema: PySchema) -> PyField: ...
def __repr__(self) -> str: ...
Expand Down Expand Up @@ -933,6 +934,7 @@ class PySeries:
def image_resize(self, w: int, h: int) -> PySeries: ...
def if_else(self, other: PySeries, predicate: PySeries) -> PySeries: ...
def is_null(self) -> PySeries: ...
def not_null(self) -> PySeries: ...
def _debug_bincode_serialize(self) -> bytes: ...
@staticmethod
def _debug_bincode_deserialize(b: bytes) -> PySeries: ...
Expand Down
13 changes: 13 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,19 @@ def is_null(self) -> Expression:
expr = self._expr.is_null()
return Expression._from_pyexpr(expr)

def not_null(self) -> Expression:
"""Checks if values in the Expression are not Null (a special value indicating missing data)

Example:
>>> # [1., None, NaN] -> [True, False, True]
>>> col("x").not_null()

Returns:
Expression: Boolean Expression indicating whether values are not missing
"""
expr = self._expr.not_null()
return Expression._from_pyexpr(expr)

def name(self) -> builtins.str:
return self._expr.name()

Expand Down
4 changes: 4 additions & 0 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,10 @@ def is_null(self) -> Series:
assert self._series is not None
return Series._from_pyseries(self._series.is_null())

def not_null(self) -> Series:
assert self._series is not None
return Series._from_pyseries(self._series.not_null())

@property
def float(self) -> SeriesFloatNamespace:
return SeriesFloatNamespace.from_series(self)
Expand Down
5 changes: 5 additions & 0 deletions src/daft-core/src/array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ pub trait DaftIsNull {
fn is_null(&self) -> Self::Output;
}

pub trait DaftNotNull {
type Output;
fn not_null(&self) -> Self::Output;
}

pub trait DaftIsNan {
type Output;
fn is_nan(&self) -> Self::Output;
Expand Down
97 changes: 72 additions & 25 deletions src/daft-core/src/array/ops/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,25 @@ use crate::{
};
use common_error::DaftResult;

use super::DaftIsNull;
use super::{DaftIsNull, DaftNotNull};

impl<T> DaftIsNull for DataArray<T>
impl<T> DataArray<T>
where
T: DaftPhysicalType,
{
type Output = DaftResult<DataArray<BooleanType>>;

fn is_null(&self) -> Self::Output {
// Common functionality for nullity checks
fn check_nullity(&self, is_null: bool) -> DaftResult<DataArray<BooleanType>> {
let arrow_array = &self.data;
let result_arrow_array = Box::new(match arrow_array.validity() {
// If the bitmap is None, the arrow array doesn't have null values
// (unless it's a NullArray - so check the null count)
None => match arrow_array.null_count() {
0 => arrow2::array::BooleanArray::from_slice(vec![false; arrow_array.len()]),
_ => arrow2::array::BooleanArray::from_slice(vec![true; arrow_array.len()]),
0 => arrow2::array::BooleanArray::from_slice(vec![!is_null; arrow_array.len()]), // false for is_null and true for not_null
_ => arrow2::array::BooleanArray::from_slice(vec![is_null; arrow_array.len()]), // true for is_null and false for not_null
},
Some(bitmap) => arrow2::array::BooleanArray::new(
arrow2::datatypes::DataType::Boolean,
!bitmap,
if is_null { !bitmap } else { bitmap.clone() }, // flip the bitmap for is_null
None,
),
});
Expand All @@ -38,29 +37,73 @@ where
}
}

impl<T> DaftIsNull for DataArray<T>
where
T: DaftPhysicalType,
{
type Output = DaftResult<DataArray<BooleanType>>;

fn is_null(&self) -> Self::Output {
self.check_nullity(true)
}
}

impl<T> DaftNotNull for DataArray<T>
where
T: DaftPhysicalType,
{
type Output = DaftResult<DataArray<BooleanType>>;

fn not_null(&self) -> Self::Output {
self.check_nullity(false)
}
}

macro_rules! check_nullity_nested_array {
($arr:expr, $is_null:expr) => {{
match $arr.validity() {
None => Ok(BooleanArray::from((
$arr.name(),
repeat(!$is_null)
.take($arr.len())
.collect::<Vec<_>>()
.as_slice(),
))),
Some(validity) => Ok(BooleanArray::from((
$arr.name(),
arrow2::array::BooleanArray::new(
arrow2::datatypes::DataType::Boolean,
if $is_null {
!validity
} else {
validity.clone()
},
None,
),
))),
}
}};
}

macro_rules! impl_is_null_nested_array {
($arr:ident) => {
impl DaftIsNull for $arr {
type Output = DaftResult<DataArray<BooleanType>>;

fn is_null(&self) -> Self::Output {
match self.validity() {
None => Ok(BooleanArray::from((
self.name(),
repeat(false)
.take(self.len())
.collect::<Vec<_>>()
.as_slice(),
))),
Some(validity) => Ok(BooleanArray::from((
self.name(),
arrow2::array::BooleanArray::new(
arrow2::datatypes::DataType::Boolean,
validity.clone(),
None,
),
))),
}
check_nullity_nested_array!(self, true)
}
}
};
}

macro_rules! impl_not_null_nested_array {
($arr:ident) => {
impl DaftNotNull for $arr {
type Output = DaftResult<DataArray<BooleanType>>;

fn not_null(&self) -> Self::Output {
check_nullity_nested_array!(self, false)
}
}
};
Expand All @@ -70,6 +113,10 @@ impl_is_null_nested_array!(ListArray);
impl_is_null_nested_array!(FixedSizeListArray);
impl_is_null_nested_array!(StructArray);

impl_not_null_nested_array!(ListArray);
impl_not_null_nested_array!(FixedSizeListArray);
impl_not_null_nested_array!(StructArray);

impl<T> DataArray<T>
where
T: DaftPhysicalType,
Expand Down
4 changes: 4 additions & 0 deletions src/daft-core/src/python/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ impl PySeries {
Ok(self.series.is_null()?.into())
}

pub fn not_null(&self) -> PyResult<Self> {
Ok(self.series.not_null()?.into())
}

pub fn _debug_bincode_serialize(&self, py: Python) -> PyResult<PyObject> {
let values = bincode::serialize(&self.series).unwrap();
Ok(PyBytes::new(py, &values).to_object(py))
Expand Down
7 changes: 7 additions & 0 deletions src/daft-core/src/series/array_impl/data_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ macro_rules! impl_series_like_for_data_array {

Ok(DaftIsNull::is_null(&self.0)?.into_series())
}

fn not_null(&self) -> DaftResult<Series> {
use crate::array::ops::DaftNotNull;

Ok(DaftNotNull::not_null(&self.0)?.into_series())
}

fn len(&self) -> usize {
self.0.len()
}
Expand Down
6 changes: 6 additions & 0 deletions src/daft-core/src/series/array_impl/logical_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ macro_rules! impl_series_like_for_logical_array {
Ok(DaftIsNull::is_null(&self.0.physical)?.into_series())
}

fn not_null(&self) -> DaftResult<Series> {
use crate::array::ops::DaftNotNull;

Ok(DaftNotNull::not_null(&self.0.physical)?.into_series())
}

fn len(&self) -> usize {
self.0.physical.len()
}
Expand Down
6 changes: 5 additions & 1 deletion src/daft-core/src/series/array_impl/nested_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use common_error::{DaftError, DaftResult};

use crate::array::ops::broadcast::Broadcastable;
use crate::array::ops::{DaftIsNull, GroupIndices};
use crate::array::ops::{DaftIsNull, DaftNotNull, GroupIndices};
use crate::array::{FixedSizeListArray, ListArray, StructArray};
use crate::datatypes::BooleanArray;
use crate::datatypes::Field;
Expand Down Expand Up @@ -105,6 +105,10 @@ macro_rules! impl_series_like_for_nested_arrays {
Ok(self.0.is_null()?.into_series())
}

fn not_null(&self) -> DaftResult<Series> {
Ok(self.0.not_null()?.into_series())
}

fn sort(&self, _descending: bool) -> DaftResult<Series> {
Err(DaftError::ValueError(format!(
"Cannot sort a {}",
Expand Down
4 changes: 4 additions & 0 deletions src/daft-core/src/series/ops/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ impl Series {
pub fn is_null(&self) -> DaftResult<Series> {
self.inner.is_null()
}

pub fn not_null(&self) -> DaftResult<Series> {
self.inner.not_null()
}
}
1 change: 1 addition & 0 deletions src/daft-core/src/series/series_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub trait SeriesLike: Send + Sync + Any + std::fmt::Debug {
fn rename(&self, name: &str) -> Series;
fn size_bytes(&self) -> DaftResult<usize>;
fn is_null(&self) -> DaftResult<Series>;
fn not_null(&self) -> DaftResult<Series>;
fn sort(&self, descending: bool) -> DaftResult<Series>;
fn head(&self, num: usize) -> DaftResult<Series>;
fn slice(&self, start: usize, end: usize) -> DaftResult<Series>;
Expand Down
16 changes: 15 additions & 1 deletion src/daft-dsl/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub enum Expr {
},
Not(ExprRef),
IsNull(ExprRef),
NotNull(ExprRef),
Literal(lit::LiteralValue),
IfElse {
if_true: ExprRef,
Expand Down Expand Up @@ -283,6 +284,10 @@ impl Expr {
Expr::IsNull(self.clone().into())
}

pub fn not_null(&self) -> Self {
Expr::NotNull(self.clone().into())
}

pub fn eq(&self, other: &Self) -> Self {
binary_op(Operator::Eq, self, other)
}
Expand Down Expand Up @@ -338,6 +343,10 @@ impl Expr {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("{child_id}.is_null()"))
}
NotNull(expr) => {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("{child_id}.not_null()"))
}
Function { func, inputs } => {
let inputs = inputs
.iter()
Expand Down Expand Up @@ -381,7 +390,9 @@ impl Expr {
Literal(..) => vec![],

// One child.
Not(expr) | IsNull(expr) | Cast(expr, ..) | Alias(expr, ..) => vec![expr.clone()],
Not(expr) | IsNull(expr) | NotNull(expr) | Cast(expr, ..) | Alias(expr, ..) => {
vec![expr.clone()]
}
Agg(agg_expr) => vec![agg_expr.child()],

// Multiple children.
Expand Down Expand Up @@ -416,6 +427,7 @@ impl Expr {
}
}
IsNull(expr) => Ok(Field::new(expr.name()?, DataType::Boolean)),
NotNull(expr) => Ok(Field::new(expr.name()?, DataType::Boolean)),
Literal(value) => Ok(Field::new("literal", value.get_type())),
Function { func, inputs } => func.to_field(inputs.as_slice(), schema, self),
BinaryOp { op, left, right } => {
Expand Down Expand Up @@ -497,6 +509,7 @@ impl Expr {
Column(name) => Ok(name.as_ref()),
Not(expr) => expr.name(),
IsNull(expr) => expr.name(),
NotNull(expr) => expr.name(),
Literal(..) => Ok("literal"),
Function { func: _, inputs } => inputs.first().unwrap().name(),
BinaryOp {
Expand Down Expand Up @@ -549,6 +562,7 @@ impl Display for Expr {
Column(name) => write!(f, "col({name})"),
Not(expr) => write!(f, "not({expr})"),
IsNull(expr) => write!(f, "is_null({expr})"),
NotNull(expr) => write!(f, "not_null({expr})"),
Literal(val) => write!(f, "lit({val})"),
Function { func, inputs } => {
write!(f, "{}(", func.fn_name())?;
Expand Down
1 change: 1 addition & 0 deletions src/daft-dsl/src/optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub fn requires_computation(e: &Expr) -> bool {
| Expr::Function { .. }
| Expr::Not(..)
| Expr::IsNull(..)
| Expr::NotNull(..)
| Expr::IfElse { .. } => true,
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/daft-dsl/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ impl PyExpr {
Ok(self.expr.is_null().into())
}

pub fn not_null(&self) -> PyResult<Self> {
Ok(self.expr.not_null().into())
}

pub fn name(&self) -> PyResult<&str> {
Ok(self.expr.name()?)
}
Expand Down
3 changes: 2 additions & 1 deletion src/daft-dsl/src/treenode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ impl TreeNode for Expr {
{
use Expr::*;
let children = match self {
Alias(expr, _) | Cast(expr, _) | Not(expr) | IsNull(expr) => {
Alias(expr, _) | Cast(expr, _) | Not(expr) | IsNull(expr) | NotNull(expr) => {
vec![expr.as_ref()]
}
Agg(agg_expr) => {
Expand Down Expand Up @@ -69,6 +69,7 @@ impl TreeNode for Expr {
}
Not(expr) => Not(transform(expr.as_ref().clone())?.into()),
IsNull(expr) => IsNull(transform(expr.as_ref().clone())?.into()),
NotNull(expr) => NotNull(transform(expr.as_ref().clone())?.into()),
IfElse {
if_true,
if_false,
Expand Down
7 changes: 7 additions & 0 deletions src/daft-plan/src/logical_ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ fn replace_column_with_semantic_id(
|_| e,
)
}
Expr::NotNull(child) => {
replace_column_with_semantic_id(child.clone(), subexprs_to_replace, schema)
.map_yes_no(
|transformed_child| Expr::NotNull(transformed_child).into(),
|_| e,
)
}
Expr::BinaryOp { op, left, right } => {
let left =
replace_column_with_semantic_id(left.clone(), subexprs_to_replace, schema);
Expand Down
7 changes: 7 additions & 0 deletions src/daft-plan/src/physical_ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ impl Project {
)?;
Ok(Expr::IsNull(newchild.into()))
}
Expr::NotNull(child) => {
let newchild = Self::translate_partition_spec_expr(
child.as_ref(),
old_colname_to_new_colname,
)?;
Ok(Expr::NotNull(newchild.into()))
}
Expr::IfElse {
if_true,
if_false,
Expand Down
Loading
Loading