Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skipping partial aggregation when it is not helping for high cardinality aggregates #11627

Merged
merged 5 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ config_namespace! {

/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false

/// Aggregation ratio (number of distinct groups / number of input rows)
/// threshold for skipping partial aggregation. If the value is greater
/// then partial aggregation will skip aggregation for further input
pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8

/// Number of input rows partial aggregation partition should process, before
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
}
}

Expand Down
105 changes: 103 additions & 2 deletions datafusion/expr/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub trait Accumulator: Send + Sync + Debug {
///
/// Intermediate state is used for "multi-phase" grouping in
/// DataFusion, where an aggregate is computed in parallel with
/// multiple `Accumulator` instances, as illustrated below:
/// multiple `Accumulator` instances, as described below:
///
/// # MultiPhase Grouping
///
Expand Down Expand Up @@ -130,7 +130,7 @@ pub trait Accumulator: Send + Sync + Debug {
/// `───────' `───────'
/// ```
///
/// The partial state is serialied as `Arrays` and then combined
/// The partial state is serialized as `Arrays` and then combined
/// with other partial states from different instances of this
/// Accumulator (that ran on different partitions, for example).
///
Expand All @@ -147,6 +147,107 @@ pub trait Accumulator: Send + Sync + Debug {
/// Note that [`ScalarValue::List`] can be used to pass multiple
/// values if the number of intermediate values is not known at
/// planning time (e.g. for `MEDIAN`)
///
/// # Multi-phase repartitioned Grouping
///
/// Many multi-phase grouping plans contain a Repartition operation
/// as well as shown below:
///
/// ```text
/// ▲ ▲
/// │ │
/// │ │
/// │ │
/// │ │
/// │ │
/// ┌───────────────────────┐ ┌───────────────────────┐ 4. Each AggregateMode::Final
/// │GroupBy │ │GroupBy │ GroupBy has an entry for its
/// │(AggregateMode::Final) │ │(AggregateMode::Final) │ subset of groups (in this case
/// │ │ │ │ that means half the entries)
/// └───────────────────────┘ └───────────────────────┘
/// ▲ ▲
/// │ │
/// └─────────────┬────────────┘
/// │
/// │
/// │
/// ┌─────────────────────────┐ 3. Repartitioning by hash(group
/// │ Repartition │ keys) ensures that each distinct
/// │ HASH(x) │ group key now appears in exactly
/// └─────────────────────────┘ one partition
/// ▲
/// │
/// ┌───────────────┴─────────────┐
/// │ │
/// │ │
/// ┌─────────────────────────┐ ┌──────────────────────────┐ 2. Each AggregateMode::Partial
/// │ GroubyBy │ │ GroubyBy │ GroupBy has an entry for *all*
/// │(AggregateMode::Partial) │ │ (AggregateMode::Partial) │ the groups
/// └─────────────────────────┘ └──────────────────────────┘
/// ▲ ▲
/// │ ┌┘
/// │ │
/// .─────────. .─────────.
/// ,─' '─. ,─' '─.
/// ; Input : ; Input : 1. Since input data is
/// : Partition 0 ; : Partition 1 ; arbitrarily or RoundRobin
/// ╲ ╱ ╲ ╱ distributed, each partition
/// '─. ,─' '─. ,─' likely has all distinct
/// `───────' `───────'
/// ```
///
/// This structure is used so that the `AggregateMode::Partial` accumulators
/// reduces the cardinality of the input as soon as possible. Typically,
/// each partial accumulator sees all groups in the input as the group keys
/// are evenly distributed across the input.
///
/// The final output is computed by repartitioning the result of
/// [`Self::state`] from each Partial aggregate and `hash(group keys)` so
/// that each distinct group key appears in exactly one of the
/// `AggregateMode::Final` GroupBy nodes. The output of the final nodes are
/// then unioned together to produce the overall final output.
///
/// Here is an example that shows the distribution of groups in the
/// different phases
///
/// ```text
/// ┌─────┐ ┌─────┐
/// │ 1 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │ After repartitioning by
/// └─────┘ └─────┘ hash(group keys), each distinct
/// ┌─────┐ ┌─────┐ group key now appears in exactly
/// │ 1 │ │ 3 │ one partition
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │
/// └─────┘ └─────┘
///
///
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
///
/// ┌─────┐ ┌─────┐
/// │ 2 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 3 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 1 │
/// └─────┘ └─────┘ Input data is arbitrarily or
/// ... ... RoundRobin distributed, each
/// ┌─────┐ ┌─────┐ partition likely has all
/// │ 1 │ │ 4 │ distinct group keys
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 1 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// └─────┘ └─────┘
///
/// group values group values
/// in partition 0 in partition 1
/// ```
fn state(&mut self) -> Result<Vec<ScalarValue>>;

/// Updates the accumulator's state from an `Array` containing one
Expand Down
66 changes: 61 additions & 5 deletions datafusion/expr/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Vectorized [`GroupsAccumulator`]

use arrow_array::{ArrayRef, BooleanArray};
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Result};

/// Describes how many rows should be emitted during grouping.
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -128,18 +128,23 @@ pub trait GroupsAccumulator: Send {
/// Returns the intermediate aggregate state for this accumulator,
/// used for multi-phase grouping, resetting its internal state.
///
/// See [`Accumulator::state`] for more information on multi-phase
/// aggregation.
///
/// For example, `AVG` might return two arrays: `SUM` and `COUNT`
/// but the `MIN` aggregate would just return a single array.
///
/// Note more sophisticated internal state can be passed as
/// single `StructArray` rather than multiple arrays.
///
/// See [`Self::evaluate`] for details on the required output
/// order and `emit_to`.
/// order and `emit_to`.
///
/// [`Accumulator::state`]: crate::Accumulator::state
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

/// Merges intermediate state (the output from [`Self::state`])
/// into this accumulator's values.
/// into this accumulator's current state.
///
/// For some aggregates (such as `SUM`), `merge_batch` is the same
/// as `update_batch`, but for some aggregates (such as `COUNT`,
Expand All @@ -158,8 +163,59 @@ pub trait GroupsAccumulator: Send {
total_num_groups: usize,
) -> Result<()>;

/// Converts an input batch directly the intermediate aggregate state.
///
/// This is the equivalent of treating each input row as its own group. It
/// is invoked when the Partial phase of a multi-phase aggregation is not
/// reducing the cardinality enough to warrant spending more effort on
/// pre-aggregation (see `Background` section below), and switches to
/// passing intermediate state directly on to the next aggregation phase.
///
/// Examples:
/// * `COUNT`: an array of 1s for each row in the input batch.
/// * `SUM/MIN/MAX`: the input values themselves.
///
/// # Arguments
/// * `values`: the input arguments to the accumulator
/// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored
///
/// # Background
///
/// In a multi-phase aggregation (see [`Accumulator::state`]), the initial
/// Partial phase reduces the cardinality of the input data as soon as
/// possible in the plan.
///
/// This strategy is very effective for queries with a small number of
/// groups, as most of the data is aggregated immediately and only a small
/// amount of data must be repartitioned (see [`Accumulator::state`] for
/// background)
///
/// However, for queries with a large number of groups, the Partial phase
/// often does not reduce the cardinality enough to warrant the memory and
/// CPU cost of actually performing the aggregation. For such cases, the
/// HashAggregate operator will dynamically switch to passing intermediate
/// state directly to the next aggregation phase with minimal processing
/// using this method.
///
/// [`Accumulator::state`]: crate::Accumulator::state
fn convert_to_state(
&self,
_values: &[ArrayRef],
_opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
not_impl_err!("Input batch conversion to state not implemented")
}

/// Returns `true` if [`Self::convert_to_state`] is implemented to support
/// intermediate aggregate state conversion.
fn supports_convert_to_state(&self) -> bool {
false
}

/// Amount of memory used to store the state of this accumulator,
/// in bytes. This function is called once per batch, so it should
/// be `O(n)` to compute, not `O(num_groups)`
/// in bytes.
///
/// This function is called once per batch, so it should be `O(n)` to
/// compute, not `O(num_groups)`
fn size(&self) -> usize;
}
4 changes: 3 additions & 1 deletion datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {

/// Return the fields used to store the intermediate state of this accumulator.
///
/// See [`Accumulator::state`] for background information.
///
/// args: [`StateFieldsArgs`] contains arguments passed to the
/// aggregate function's accumulator.
///
Expand Down Expand Up @@ -388,7 +390,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// # Notes
///
/// Even if this function returns true, DataFusion will still use
/// `Self::accumulator` for certain queries, such as when this aggregate is
/// [`Self::accumulator`] for certain queries, such as when this aggregate is
/// used as a window function or when there no GROUP BY columns in the
/// query.
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
Expand Down
10 changes: 10 additions & 0 deletions datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,14 @@ paste = "1.0.14"
sqlparser = { workspace = true }

[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
criterion = "0.5"
rand = { workspace = true }

[[bench]]
name = "count"
harness = false

[[bench]]
name = "sum"
harness = false
98 changes: 98 additions & 0 deletions datafusion/functions-aggregate/benches/count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, BooleanArray};
use arrow::datatypes::Int32Type;
use arrow::util::bench_util::{create_boolean_array, create_primitive_array};
use arrow_schema::{DataType, Field, Schema};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_common::DFSchema;
use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator};
use datafusion_functions_aggregate::count::Count;
use std::sync::Arc;

fn prepare_accumulator() -> Box<dyn GroupsAccumulator> {
let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Int32, true)]));
let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
let accumulator_args = AccumulatorArgs {
data_type: &DataType::Int64,
schema: &schema,
dfschema: &df_schema,
ignore_nulls: false,
sort_exprs: &[],
is_reversed: false,
name: "COUNT(f)",
is_distinct: false,
input_types: &[DataType::Int32],
input_exprs: &[datafusion_expr::col("f")],
};
let count_fn = Count::new();

count_fn
.create_groups_accumulator(accumulator_args)
.unwrap()
}

fn convert_to_state_bench(
c: &mut Criterion,
name: &str,
values: ArrayRef,
opt_filter: Option<&BooleanArray>,
) {
let accumulator = prepare_accumulator();
c.bench_function(name, |b| {
b.iter(|| {
black_box(
accumulator
.convert_to_state(&[values.clone()], opt_filter)
.unwrap(),
)
})
});
}

fn count_benchmark(c: &mut Criterion) {
let values = Arc::new(create_primitive_array::<Int32Type>(8192, 0.0)) as ArrayRef;
convert_to_state_bench(c, "count convert state no nulls, no filter", values, None);

let values = Arc::new(create_primitive_array::<Int32Type>(8192, 0.3)) as ArrayRef;
convert_to_state_bench(c, "count convert state 30% nulls, no filter", values, None);

let values = Arc::new(create_primitive_array::<Int32Type>(8192, 0.3)) as ArrayRef;
convert_to_state_bench(c, "count convert state 70% nulls, no filter", values, None);

let values = Arc::new(create_primitive_array::<Int32Type>(8192, 0.0)) as ArrayRef;
let filter = create_boolean_array(8192, 0.0, 0.5);
convert_to_state_bench(
c,
"count convert state no nulls, filter",
values,
Some(&filter),
);

let values = Arc::new(create_primitive_array::<Int32Type>(8192, 0.3)) as ArrayRef;
let filter = create_boolean_array(8192, 0.0, 0.5);
convert_to_state_bench(
c,
"count convert state nulls, filter",
values,
Some(&filter),
);
}

criterion_group!(benches, count_benchmark);
criterion_main!(benches);
Loading