Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/test_for_pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 7, 2024
2 parents bbe30e0 + ddbdf4b commit 10d2242
Show file tree
Hide file tree
Showing 27 changed files with 1,054 additions and 135 deletions.
88 changes: 69 additions & 19 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1321,14 +1321,6 @@ impl ScalarValue {
}
}

/// Getter for the `DataType` of the value.
///
/// Suggest using [`Self::data_type`] as a more standard API
#[deprecated(since = "31.0.0", note = "use data_type instead")]
pub fn get_datatype(&self) -> DataType {
self.data_type()
}

/// Calculate arithmetic negation for a scalar value
pub fn arithmetic_negate(&self) -> Result<Self> {
fn neg_checked_with_ctx<T: ArrowNativeTypeOp>(
Expand Down Expand Up @@ -3661,18 +3653,20 @@ fn fmt_list(arr: ArrayRef, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{value_formatter}")
}

/// writes a byte array to formatter. `[1, 2, 3]` ==> `"1,2,3"`
fn fmt_binary(data: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
let mut iter = data.iter();
if let Some(b) = iter.next() {
write!(f, "{b}")?;
}
for b in iter {
write!(f, ",{b}")?;
}
Ok(())
}

impl fmt::Debug for ScalarValue {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt_binary(data: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
data.iter()
.map(|v| format!("{v}"))
.collect::<Vec<_>>()
.join(",")
)
}
match self {
ScalarValue::Decimal128(_, _, _) => write!(f, "Decimal128({self})"),
ScalarValue::Decimal256(_, _, _) => write!(f, "Decimal256({self})"),
Expand Down Expand Up @@ -3722,7 +3716,7 @@ impl fmt::Debug for ScalarValue {
write!(f, "FixedSizeBinary({size}, {self})")
}
ScalarValue::FixedSizeBinary(size, Some(b)) => {
write!(f, "Binary({size}, \"")?;
write!(f, "FixedSizeBinary({size}, \"")?;
fmt_binary(b.as_slice(), f)?;
write!(f, "\")")
}
Expand Down Expand Up @@ -6667,6 +6661,8 @@ mod tests {
fn test_binary_display() {
let no_binary_value = ScalarValue::Binary(None);
assert_eq!(format!("{no_binary_value}"), "NULL");
let single_binary_value = ScalarValue::Binary(Some(vec![42u8]));
assert_eq!(format!("{single_binary_value}"), "2A");
let small_binary_value = ScalarValue::Binary(Some(vec![1u8, 2, 3]));
assert_eq!(format!("{small_binary_value}"), "010203");
let large_binary_value =
Expand Down Expand Up @@ -6700,6 +6696,60 @@ mod tests {
assert_eq!(format!("{large_binary_value}"), "0102030405060708090A...");
}

#[test]
fn test_binary_debug() {
let no_binary_value = ScalarValue::Binary(None);
assert_eq!(format!("{no_binary_value:?}"), "Binary(NULL)");
let single_binary_value = ScalarValue::Binary(Some(vec![42u8]));
assert_eq!(format!("{single_binary_value:?}"), "Binary(\"42\")");
let small_binary_value = ScalarValue::Binary(Some(vec![1u8, 2, 3]));
assert_eq!(format!("{small_binary_value:?}"), "Binary(\"1,2,3\")");
let large_binary_value =
ScalarValue::Binary(Some(vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]));
assert_eq!(
format!("{large_binary_value:?}"),
"Binary(\"1,2,3,4,5,6,7,8,9,10,11\")"
);

let no_binary_value = ScalarValue::BinaryView(None);
assert_eq!(format!("{no_binary_value:?}"), "BinaryView(NULL)");
let small_binary_value = ScalarValue::BinaryView(Some(vec![1u8, 2, 3]));
assert_eq!(format!("{small_binary_value:?}"), "BinaryView(\"1,2,3\")");
let large_binary_value =
ScalarValue::BinaryView(Some(vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]));
assert_eq!(
format!("{large_binary_value:?}"),
"BinaryView(\"1,2,3,4,5,6,7,8,9,10,11\")"
);

let no_binary_value = ScalarValue::LargeBinary(None);
assert_eq!(format!("{no_binary_value:?}"), "LargeBinary(NULL)");
let small_binary_value = ScalarValue::LargeBinary(Some(vec![1u8, 2, 3]));
assert_eq!(format!("{small_binary_value:?}"), "LargeBinary(\"1,2,3\")");
let large_binary_value =
ScalarValue::LargeBinary(Some(vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]));
assert_eq!(
format!("{large_binary_value:?}"),
"LargeBinary(\"1,2,3,4,5,6,7,8,9,10,11\")"
);

let no_binary_value = ScalarValue::FixedSizeBinary(3, None);
assert_eq!(format!("{no_binary_value:?}"), "FixedSizeBinary(3, NULL)");
let small_binary_value = ScalarValue::FixedSizeBinary(3, Some(vec![1u8, 2, 3]));
assert_eq!(
format!("{small_binary_value:?}"),
"FixedSizeBinary(3, \"1,2,3\")"
);
let large_binary_value = ScalarValue::FixedSizeBinary(
11,
Some(vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
);
assert_eq!(
format!("{large_binary_value:?}"),
"FixedSizeBinary(11, \"1,2,3,4,5,6,7,8,9,10,11\")"
);
}

#[test]
fn test_build_timestamp_millisecond_list() {
let values = vec![ScalarValue::TimestampMillisecond(Some(1), None)];
Expand Down
18 changes: 8 additions & 10 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,21 +318,20 @@ impl NamePreserver {
Self { use_alias: true }
}

pub fn save(&self, expr: &Expr) -> Result<SavedName> {
let original_name = if self.use_alias {
pub fn save(&self, expr: &Expr) -> SavedName {
if self.use_alias {
let (relation, name) = expr.qualified_name();
SavedName::Saved { relation, name }
} else {
SavedName::None
};
Ok(original_name)
}
}
}

impl SavedName {
/// Ensures the qualified name of the rewritten expression is preserved
pub fn restore(self, expr: Expr) -> Result<Expr> {
let expr = match self {
pub fn restore(self, expr: Expr) -> Expr {
match self {
SavedName::Saved { relation, name } => {
let (new_relation, new_name) = expr.qualified_name();
if new_relation != relation || new_name != name {
Expand All @@ -342,8 +341,7 @@ impl SavedName {
}
}
SavedName::None => expr,
};
Ok(expr)
}
}
}

Expand Down Expand Up @@ -543,9 +541,9 @@ mod test {
let mut rewriter = TestRewriter {
rewrite_to: rewrite_to.clone(),
};
let saved_name = NamePreserver { use_alias: true }.save(&expr_from).unwrap();
let saved_name = NamePreserver { use_alias: true }.save(&expr_from);
let new_expr = expr_from.clone().rewrite(&mut rewriter).unwrap().data;
let new_expr = saved_name.restore(new_expr).unwrap();
let new_expr = saved_name.restore(new_expr);

let original_name = expr_from.qualified_name();
let new_name = new_expr.qualified_name();
Expand Down
36 changes: 36 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,19 @@ impl LogicalPlanBuilder {
.map(Self::new)
}

/// Convert a table provider into a builder with a TableScan with filter and fetch
pub fn scan_with_filters_fetch(
table_name: impl Into<TableReference>,
table_source: Arc<dyn TableSource>,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
fetch: Option<usize>,
) -> Result<Self> {
TableScan::try_new(table_name, table_source, projection, filters, fetch)
.map(LogicalPlan::TableScan)
.map(Self::new)
}

/// Wrap a plan in a window
pub fn window_plan(
input: LogicalPlan,
Expand Down Expand Up @@ -1424,6 +1437,29 @@ pub fn table_scan_with_filters(
LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
}

/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
/// filters, and inlined fetch.
/// This is mostly used for testing and documentation.
pub fn table_scan_with_filter_and_fetch(
name: Option<impl Into<TableReference>>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
fetch: Option<usize>,
) -> Result<LogicalPlanBuilder> {
let table_source = table_source(table_schema);
let name = name
.map(|n| n.into())
.unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
LogicalPlanBuilder::scan_with_filters_fetch(
name,
table_source,
projection,
filters,
fetch,
)
}

fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
let table_schema = Arc::new(table_schema.clone());
Arc::new(LogicalTableSource { table_schema })
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ impl LogicalPlan {
let schema = Arc::clone(plan.schema());
let name_preserver = NamePreserver::new(&plan);
plan.map_expressions(|e| {
let original_name = name_preserver.save(&e)?;
let original_name = name_preserver.save(&e);
let transformed_expr =
e.infer_placeholder_types(&schema)?.transform_up(|e| {
if let Expr::Placeholder(Placeholder { id, .. }) = e {
Expand All @@ -1452,7 +1452,7 @@ impl LogicalPlan {
}
})?;
// Preserve name to avoid breaking column references to this expression
transformed_expr.map_data(|expr| original_name.restore(expr))
Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
})
})
.map(|res| res.data)
Expand Down
11 changes: 6 additions & 5 deletions datafusion/functions-nested/src/array_has.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use arrow_array::{Datum, GenericListArray, Scalar};
use datafusion_common::cast::as_generic_list_array;
use datafusion_common::utils::string_utils::string_array_to_vec;
use datafusion_common::{exec_err, Result, ScalarValue};
use datafusion_expr::{ColumnarValue, Operator, ScalarUDFImpl, Signature, Volatility};

use datafusion_physical_expr_common::datum::compare_op_for_nested;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use datafusion_physical_expr_common::datum::compare_with_eq;
use itertools::Itertools;

use crate::utils::make_scalar_function;
Expand Down Expand Up @@ -180,8 +179,9 @@ fn array_has_dispatch_for_array<O: OffsetSizeTrait>(
continue;
}
let arr = arr.unwrap();
let is_nested = arr.data_type().is_nested();
let needle_row = Scalar::new(needle.slice(i, 1));
let eq_array = compare_op_for_nested(Operator::Eq, &arr, &needle_row)?;
let eq_array = compare_with_eq(&arr, &needle_row, is_nested)?;
let is_contained = eq_array.true_count() > 0;
boolean_builder.append_value(is_contained)
}
Expand All @@ -195,13 +195,14 @@ fn array_has_dispatch_for_scalar<O: OffsetSizeTrait>(
) -> Result<ArrayRef> {
let haystack = as_generic_list_array::<O>(haystack)?;
let values = haystack.values();
let is_nested = values.data_type().is_nested();
let offsets = haystack.value_offsets();
// If first argument is empty list (second argument is non-null), return false
// i.e. array_has([], non-null element) -> false
if values.len() == 0 {
return Ok(Arc::new(BooleanArray::from(vec![Some(false)])));
}
let eq_array = compare_op_for_nested(Operator::Eq, values, needle)?;
let eq_array = compare_with_eq(values, needle, is_nested)?;
let mut final_contained = vec![None; haystack.len()];
for (i, offset) in offsets.windows(2).enumerate() {
let start = offset[0].to_usize().unwrap();
Expand Down
5 changes: 5 additions & 0 deletions datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,8 @@ required-features = ["math_expressions"]
harness = false
name = "substr"
required-features = ["unicode_expressions"]

[[bench]]
harness = false
name = "character_length"
required-features = ["unicode_expressions"]
Loading

0 comments on commit 10d2242

Please sign in to comment.