Skip to content

Commit

Permalink
Add PRIMARY KEY Aggregate support to dataframe API (apache#8356)
Browse files Browse the repository at this point in the history
* Aggregate rewrite for dataframe API.

* Simplifications

* Minor changes

* Minor changes

* Add new test

* Add new tests

* Minor changes

* Add rule, for aggregate simplification

* Simplifications

* Simplifications

* Simplifications

* Minor changes

* Simplifications

* Add new test condition

* Tmp

* Push requirement below aggregate

* Add join and subqeury alias

* Add cross join support

* Minor changes

* Add logical plan repartition support

* Add union support

* Add table scan

* Add limit

* Minor changes, buggy

* Add new tests, fix existing bugs

* change concat type array_concat

* Resolve some of the bugs

* Comment out a rule

* All tests pass, when single distinct is closed

* Fix aggregate bug

* Change analyze and explain implementations

* All tests pass

* Resolve linter errors

* Simplifications, remove unnecessary codes

* Comment out tests

* Remove pushdown projection

* Pushdown empty projections

* Fix failing tests

* Simplifications

* Update comments, simplifications

* Remove eliminate projection rule, Add method for group expr len aggregate

* Simplifications, subquery support

* Update comments, add unnest support, simplifications

* Remove eliminate projection pass

* Change name

* Minor changes

* Minor changes

* Add comments

* Fix failing test

* Minor simplifications

* update

* Minor

* Remove ordering

* Minor changes

* add merge projections

* Add comments, resolve linter errors

* Minor changes

* Minor changes

* Minor changes

* Minor changes

* Minor changes

* Minor changes

* Minor changes

* Minor changes

* Review Part 1

* Review Part 2

* Fix quadratic search, Change trim_expr impl

* Review Part 3

* Address reviews

* Minor changes

* Review Part 4

* Add case expr support

* Review Part 5

* Review Part 6

* Finishing touch: Improve comments

---------

Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
3 people authored and appletreeisyellow committed Dec 15, 2023
1 parent ea6ab10 commit ebdc7da
Show file tree
Hide file tree
Showing 14 changed files with 1,349 additions and 611 deletions.
13 changes: 10 additions & 3 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,16 @@ impl DFSchema {
pub fn with_functional_dependencies(
mut self,
functional_dependencies: FunctionalDependencies,
) -> Self {
self.functional_dependencies = functional_dependencies;
self
) -> Result<Self> {
if functional_dependencies.is_valid(self.fields.len()) {
self.functional_dependencies = functional_dependencies;
Ok(self)
} else {
_plan_err!(
"Invalid functional dependency: {:?}",
functional_dependencies
)
}
}

/// Create a new schema that contains the fields from this schema followed by the fields
Expand Down
117 changes: 103 additions & 14 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::ops::Deref;
use std::vec::IntoIter;

use crate::error::_plan_err;
use crate::utils::{merge_and_order_indices, set_difference};
use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};

use sqlparser::ast::TableConstraint;
Expand Down Expand Up @@ -271,6 +272,29 @@ impl FunctionalDependencies {
self.deps.extend(other.deps);
}

/// Sanity checks if functional dependencies are valid. For example, if
/// there are 10 fields, we cannot receive any index further than 9.
pub fn is_valid(&self, n_field: usize) -> bool {
self.deps.iter().all(
|FunctionalDependence {
source_indices,
target_indices,
..
}| {
source_indices
.iter()
.max()
.map(|&max_index| max_index < n_field)
.unwrap_or(true)
&& target_indices
.iter()
.max()
.map(|&max_index| max_index < n_field)
.unwrap_or(true)
},
)
}

/// Adds the `offset` value to `source_indices` and `target_indices` for
/// each functional dependency.
pub fn add_offset(&mut self, offset: usize) {
Expand Down Expand Up @@ -442,44 +466,56 @@ pub fn aggregate_functional_dependencies(
} in &func_dependencies.deps
{
// Keep source indices in a `HashSet` to prevent duplicate entries:
let mut new_source_indices = HashSet::new();
let mut new_source_indices = vec![];
let mut new_source_field_names = vec![];
let source_field_names = source_indices
.iter()
.map(|&idx| aggr_input_fields[idx].qualified_name())
.collect::<Vec<_>>();

for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() {
// When one of the input determinant expressions matches with
// the GROUP BY expression, add the index of the GROUP BY
// expression as a new determinant key:
if source_field_names.contains(group_by_expr_name) {
new_source_indices.insert(idx);
new_source_indices.push(idx);
new_source_field_names.push(group_by_expr_name.clone());
}
}
let existing_target_indices =
get_target_functional_dependencies(aggr_input_schema, group_by_expr_names);
let new_target_indices = get_target_functional_dependencies(
aggr_input_schema,
&new_source_field_names,
);
let mode = if existing_target_indices == new_target_indices
&& new_target_indices.is_some()
{
// If dependency covers all GROUP BY expressions, mode will be `Single`:
Dependency::Single
} else {
// Otherwise, existing mode is preserved:
*mode
};
// All of the composite indices occur in the GROUP BY expression:
if new_source_indices.len() == source_indices.len() {
aggregate_func_dependencies.push(
FunctionalDependence::new(
new_source_indices.into_iter().collect(),
new_source_indices,
target_indices.clone(),
*nullable,
)
// input uniqueness stays the same when GROUP BY matches with input functional dependence determinants
.with_mode(*mode),
.with_mode(mode),
);
}
}

// If we have a single GROUP BY key, we can guarantee uniqueness after
// aggregation:
if group_by_expr_names.len() == 1 {
// If `source_indices` contain 0, delete this functional dependency
// as it will be added anyway with mode `Dependency::Single`:
if let Some(idx) = aggregate_func_dependencies
.iter()
.position(|item| item.source_indices.contains(&0))
{
// Delete the functional dependency that contains zeroth idx:
aggregate_func_dependencies.remove(idx);
}
aggregate_func_dependencies.retain(|item| !item.source_indices.contains(&0));
// Add a new functional dependency associated with the whole table:
aggregate_func_dependencies.push(
// Use nullable property of the group by expression
Expand Down Expand Up @@ -527,8 +563,61 @@ pub fn get_target_functional_dependencies(
combined_target_indices.extend(target_indices.iter());
}
}
(!combined_target_indices.is_empty())
.then_some(combined_target_indices.iter().cloned().collect::<Vec<_>>())
(!combined_target_indices.is_empty()).then_some({
let mut result = combined_target_indices.into_iter().collect::<Vec<_>>();
result.sort();
result
})
}

/// Returns indices for the minimal subset of GROUP BY expressions that are
/// functionally equivalent to the original set of GROUP BY expressions.
pub fn get_required_group_by_exprs_indices(
schema: &DFSchema,
group_by_expr_names: &[String],
) -> Option<Vec<usize>> {
let dependencies = schema.functional_dependencies();
let field_names = schema
.fields()
.iter()
.map(|item| item.qualified_name())
.collect::<Vec<_>>();
let mut groupby_expr_indices = group_by_expr_names
.iter()
.map(|group_by_expr_name| {
field_names
.iter()
.position(|field_name| field_name == group_by_expr_name)
})
.collect::<Option<Vec<_>>>()?;

groupby_expr_indices.sort();
for FunctionalDependence {
source_indices,
target_indices,
..
} in &dependencies.deps
{
if source_indices
.iter()
.all(|source_idx| groupby_expr_indices.contains(source_idx))
{
// If all source indices are among GROUP BY expression indices, we
// can remove target indices from GROUP BY expression indices and
// use source indices instead.
groupby_expr_indices = set_difference(&groupby_expr_indices, target_indices);
groupby_expr_indices =
merge_and_order_indices(groupby_expr_indices, source_indices);
}
}
groupby_expr_indices
.iter()
.map(|idx| {
group_by_expr_names
.iter()
.position(|name| &field_names[*idx] == name)
})
.collect()
}

/// Updates entries inside the `entries` vector with their corresponding
Expand Down
5 changes: 3 additions & 2 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ pub use file_options::file_type::{
};
pub use file_options::FileTypeWriterOptions;
pub use functional_dependencies::{
aggregate_functional_dependencies, get_target_functional_dependencies, Constraint,
Constraints, Dependency, FunctionalDependence, FunctionalDependencies,
aggregate_functional_dependencies, get_required_group_by_exprs_indices,
get_target_functional_dependencies, Constraint, Constraints, Dependency,
FunctionalDependence, FunctionalDependencies,
};
pub use join_type::{JoinConstraint, JoinSide, JoinType};
pub use param_value::ParamValues;
Expand Down
Loading

0 comments on commit ebdc7da

Please sign in to comment.