Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the performance of COUNT DISTINCT queries for high cardinality groups #5547

Closed
alamb opened this issue Mar 10, 2023 · 20 comments
Closed
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Mar 10, 2023

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Queries like this (which compute distinct values for high cardinality columns) are currently relatively slow (if there are many values of UserID):

SELECT 
  SUM(EngineId), 
  COUNT(*) AS c, 
  COUNT(DISTINCT "UserID")
FROM 
  hits 
GROUP BY 
  "RegionID" 
ORDER BY 
  c DESC 
LIMIT 10;

Here is a specific clickbench query from the discussion on #5276

SELECT "RegionID", SUM("AdvEngineID"), COUNT(*) AS c, AVG("ResolutionWidth"), COUNT(DISTINCT "UserID") FROM hits GROUP BY "RegionID" ORDER BY c DESC LIMIT 10;

Describe the solution you'd like
We could make this type of query faster. Hopefully we can collect ideas here

Describe alternatives you've considered
TBD

Additional context
There are thoughts on improving aggregate performance in general #4973

This is one area where clickhouse and duckdb are particularly strong

See #5276 and #5276 (comment)

@alamb alamb added the enhancement New feature or request label Mar 10, 2023
@alamb
Copy link
Contributor Author

alamb commented Mar 10, 2023

I remember having a discussion about different strategies for this kind of query with @yjshen but now I can't find it

cc @comphead who I think also expressed interest

@alamb
Copy link
Contributor Author

alamb commented Mar 10, 2023

One suggestion from @jhorstmann :

Rough outline how deduplication could work using existing arrow kernels:
sort the array (this would be the expensive part)
compare array with a slice of itself starting at offset 1
resulting bitmap indicates duplicates
negate the bitmap
filter the array using that bitmap

@jhorstmann
Copy link
Contributor

jhorstmann commented Mar 10, 2023

Rough outline how deduplication could work using existing arrow kernels:

The sorting would probably make this more expensive than the existing accumulators. I missed that the question was in a datafusion context.

I think what is hindering the performance of the DistinctCountAccumulator is the indirections of Vec<ScalarValue> and ScalarValue again containing a string. The many enum values of ScalarValue probably also lead to a complicated eq method.

One way to avoid these indirections would be to use the row format internally in the accumulator. The state of the accumulator would consist of a byte vector that contains serialized keys, and a hashbrown::RawTable that only contains indices into that vector. The hash and eq calculation would then also only work on these byte vectors making them more efficient. Storing all keys in one vector should also improve cache locality.

@Dandandan
Copy link
Contributor

Dandandan commented Mar 10, 2023

Another option which I suggested earlier would be similar to dictionary array builder (https://docs.rs/arrow-array/34.0.0/src/arrow_array/builder/primitive_dictionary_builder.rs.html#82) - without maintaining the duplicate values which the dictionary does.

This would probably be a bit faster than (though compared to HashSet<ScalarValue> both probably are far more efficient) than using the row format (as we don't need to do the conversion plus hashing on primitive values can be a bit faster and specialized). For COUNT DISTINCT we only need to support single columns.

@alamb
Copy link
Contributor Author

alamb commented Mar 10, 2023

For distinct aggregates on strings, I think figuring out a way to avoid the overhead of per-value String would be quite important as well. ScalarValue has an owned value for strings.

🤔 I wonder if we made ScalarValue:Utf8 use an Arc<str> instead could help

@Dandandan
Copy link
Contributor

For distinct aggregates on strings, I think figuring out a way to avoid the overhead of per-value String would be quite important as well. ScalarValue has an owned value for strings.

🤔 I wonder if we made ScalarValue:Utf8 use an Arc<str> instead could help

Not sure if that makes sense, when doing so you already need to copy the original string to a new (columnar) array (to avoid keeping the original data in memory) - I think it would be better to avoid usingScalarValue altogether if you're already doing so?

@comphead
Copy link
Contributor

@Dandandan offered an interesting option trying the Primitive Dictionary Builder in #5472 (comment)
It would be interesting to test it out, as I understand the idea is not to use PrimitiveDictionaryBuilder directly but build a similar more lightweight structure? Could elaborate a bit how do you see it?

Another question for Ballista, for distributed COUNT DISTINCT to get final aggregated result, they should use the same structure right ?

@Dandandan
Copy link
Contributor

Dandandan commented Mar 10, 2023

@Dandandan offered an interesting option trying the Primitive Dictionary Builder in #5472 (comment) It would be interesting to test it out, as I understand the idea is not to use PrimitiveDictionaryBuilder directly but build a similar more lightweight structure? Could elaborate a bit how do you see it?

Another question for Ballista, for distributed COUNT DISTINCT to get final aggregated result, they should use the same structure right ?

Yes, we would basically not have to use the keys_builder and keep the rest similar to GenericByteDictionaryBuilder: https://docs.rs/arrow-array/34.0.0/src/arrow_array/builder/generic_bytes_dictionary_builder.rs.html#62

For primitive values we don't even have to use a similar structure but can use something more likeHashSet<ArrowPrimitiveType::Native> you suggested which will be close to optimal.

So the structure could look something like:

#[derive(Debug)]
pub struct BinaryDistinctValues<T>
where
    T: ByteArrayType,
{
    state: ahash::RandomState,
    /// Used to provide a lookup from string value to key type
    ///
    /// Note: usize's hash implementation is not used, instead the raw entry
    /// API is used to store keys w.r.t the hash of the strings themselves
    ///
    dedup: HashMap<usize, (), ()>,
    values_builder: GenericByteBuilder<>,
}

enum DistinctAggregationState {
   Primitive(HashSet< ArrowPrimitiveType>), // primitive values
   Utf8(BinaryDistinctValues<GenericStringType<i32>>),
   Binary(BinaryDistinctValues<GenericBinaryType <i32>>),
}

(note this doesn't support the Struct type)

@yjshen
Copy link
Member

yjshen commented Mar 10, 2023

I was suggesting using an optimizer rule to rewrite aggregate with distinct into double aggregation to eliminate distinct AggregateExprs for execution.

The gist of the idea is to first move distinct columns as additional grouping columns to compute non-distinct aggregate results, and then use another round of aggregation to compute values for distinct expressions (since they have already been deduplicated in the first aggregation as grouping columns).

I will paste JavaDoc for Spark's RewriteDistinctAggregates below because it contains helpful examples, source here

/**
 * This rule rewrites an aggregate query with distinct aggregations into an expanded double
 * aggregation in which the regular aggregation expressions and every distinct clause is aggregated
 * in a separate group. The results are then combined in a second aggregate.
 *
 * First example: query without filter clauses (in scala):
 * {{{
 *   val data = Seq(
 *     ("a", "ca1", "cb1", 10),
 *     ("a", "ca1", "cb2", 5),
 *     ("b", "ca1", "cb1", 13))
 *     .toDF("key", "cat1", "cat2", "value")
 *   data.createOrReplaceTempView("data")
 *
 *   val agg = data.groupBy($"key")
 *     .agg(
 *       count_distinct($"cat1").as("cat1_cnt"),
 *       count_distinct($"cat2").as("cat2_cnt"),
 *       sum($"value").as("total"))
 * }}}
 *
 * This translates to the following (pseudo) logical plan:
 * {{{
 * Aggregate(
 *    key = ['key]
 *    functions = [COUNT(DISTINCT 'cat1),
 *                 COUNT(DISTINCT 'cat2),
 *                 sum('value)]
 *    output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
 *   LocalTableScan [...]
 * }}}
 *
 * This rule rewrites this logical plan to the following (pseudo) logical plan:
 * {{{
 * Aggregate(
 *    key = ['key]
 *    functions = [count('cat1) FILTER (WHERE 'gid = 1),
 *                 count('cat2) FILTER (WHERE 'gid = 2),
 *                 first('total) ignore nulls FILTER (WHERE 'gid = 0)]
 *    output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
 *   Aggregate(
 *      key = ['key, 'cat1, 'cat2, 'gid]
 *      functions = [sum('value)]
 *      output = ['key, 'cat1, 'cat2, 'gid, 'total])
 *     Expand(
 *        projections = [('key, null, null, 0, cast('value as bigint)),
 *                       ('key, 'cat1, null, 1, null),
 *                       ('key, null, 'cat2, 2, null)]
 *        output = ['key, 'cat1, 'cat2, 'gid, 'value])
 *       LocalTableScan [...]
 * }}}
 *
 * Second example: aggregate function without distinct and with filter clauses (in sql):
 * {{{
 *   SELECT
 *     COUNT(DISTINCT cat1) as cat1_cnt,
 *     COUNT(DISTINCT cat2) as cat2_cnt,
 *     SUM(value) FILTER (WHERE id > 1) AS total
 *   FROM
 *     data
 *   GROUP BY
 *     key
 * }}}
 *
 * This translates to the following (pseudo) logical plan:
 * {{{
 * Aggregate(
 *    key = ['key]
 *    functions = [COUNT(DISTINCT 'cat1),
 *                 COUNT(DISTINCT 'cat2),
 *                 sum('value) FILTER (WHERE 'id > 1)]
 *    output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
 *   LocalTableScan [...]
 * }}}
 *
 * This rule rewrites this logical plan to the following (pseudo) logical plan:
 * {{{
 * Aggregate(
 *    key = ['key]
 *    functions = [count('cat1) FILTER (WHERE 'gid = 1),
 *                 count('cat2) FILTER (WHERE 'gid = 2),
 *                 first('total) ignore nulls FILTER (WHERE 'gid = 0)]
 *    output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
 *   Aggregate(
 *      key = ['key, 'cat1, 'cat2, 'gid]
 *      functions = [sum('value) FILTER (WHERE 'id > 1)]
 *      output = ['key, 'cat1, 'cat2, 'gid, 'total])
 *     Expand(
 *        projections = [('key, null, null, 0, cast('value as bigint), 'id),
 *                       ('key, 'cat1, null, 1, null, null),
 *                       ('key, null, 'cat2, 2, null, null)]
 *        output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id])
 *       LocalTableScan [...]
 * }}}
 *
 * Third example: aggregate function with distinct and filter clauses (in sql):
 * {{{
 *   SELECT
 *     COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
 *     COUNT(DISTINCT cat2) FILTER (WHERE id > 2) as cat2_cnt,
 *     SUM(value) FILTER (WHERE id > 3) AS total
 *   FROM
 *     data
 *   GROUP BY
 *     key
 * }}}
 *
 * This translates to the following (pseudo) logical plan:
 * {{{
 * Aggregate(
 *    key = ['key]
 *    functions = [COUNT(DISTINCT 'cat1) FILTER (WHERE 'id > 1),
 *                 COUNT(DISTINCT 'cat2) FILTER (WHERE 'id > 2),
 *                 sum('value) FILTER (WHERE 'id > 3)]
 *    output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
 *   LocalTableScan [...]
 * }}}
 *
 * This rule rewrites this logical plan to the following (pseudo) logical plan:
 * {{{
 * Aggregate(
 *    key = ['key]
 *    functions = [count('cat1) FILTER (WHERE 'gid = 1 and 'max_cond1),
 *                 count('cat2) FILTER (WHERE 'gid = 2 and 'max_cond2),
 *                 first('total) ignore nulls FILTER (WHERE 'gid = 0)]
 *    output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
 *   Aggregate(
 *      key = ['key, 'cat1, 'cat2, 'gid]
 *      functions = [max('cond1), max('cond2), sum('value) FILTER (WHERE 'id > 3)]
 *      output = ['key, 'cat1, 'cat2, 'gid, 'max_cond1, 'max_cond2, 'total])
 *     Expand(
 *        projections = [('key, null, null, 0, null, null, cast('value as bigint), 'id),
 *                       ('key, 'cat1, null, 1, 'id > 1, null, null, null),
 *                       ('key, null, 'cat2, 2, null, 'id > 2, null, null)]
 *        output = ['key, 'cat1, 'cat2, 'gid, 'cond1, 'cond2, 'value, 'id])
 *       LocalTableScan [...]
 * }}}

@alamb
Copy link
Contributor Author

alamb commented Mar 10, 2023

I think it would be better to avoid usingScalarValue altogether if you're already doing so?

Yes, if you can avoid using ScalarValue I agree that would be the best

@jychen7
Copy link
Contributor

jychen7 commented Mar 11, 2023

I am not sure how it may inspire Datafusion yet, just for reference, there are two improvements in DuckDB about parallelize distinct

I am going to read history of hash aggregation in Datafusion this week first and see if can propose some ideas and plan

@mingmwang
Copy link
Contributor

mingmwang commented Mar 29, 2023

t aggregation as grouping columns).

I will paste JavaDoc for Spark's RewriteDistinctAggregates below be

I can work on this. In SparkSQL, it leverages the Expand operator to achieve this, In DataFusion, we do not have Expand operator, instead, we can leverage the Group by GroupingSet to achieve this(need to use the GROUPING_ID() functions with Case When Then exprs).

This PR need to be merged first, so DataFusion can support GROUPING() and GROUPING_ID() functions.
#5749

@yjshen
Copy link
Member

yjshen commented Mar 29, 2023

In DataFusion, we do not have Expand operator, instead, we can leverage the Group by GroupingSet to achieve this(need to use the GROUPING_ID() functions with Case When Then exprs).

We can also implement Expand in DataFusion. Additionally, Grouping Sets, CUBE, and ROLLUP functionalities can also be implemented through Expand in DataFusion. I think it will simplify the GroupedHashAggregateStream code a lot, by making it grouping sets agnostic.

@yjshen
Copy link
Member

yjshen commented Mar 31, 2023

I would like to try the expand and rewrite approach to see how its performance differs from the current one.

@mingmwang
Copy link
Contributor

rite approach to see how its performance dif

There is already a rule SingleDistinctToGroupBy which will cover a single distinct to group by, it does not cover the mixed distinct with normal aggs and multi distinct cases.

@mingmwang
Copy link
Contributor

And I think the current way that DataFusion implements group by GroupingSet is better than SparkSQL's Group By + Expand approach.

@yjshen
Copy link
Member

yjshen commented Mar 31, 2023

There is already a rule SingleDistinctToGroupBy which will cover a single distinct to group by, it does not cover the mixed distinct with normal aggs and multi distinct cases.

Yes, I'm aware of this.

And I think the current way that DataFusion implements group by GroupingSet is better than SparkSQL's Group By + Expand approach.

Could you elaborate on this?

My concern about the current GroupingSet implementation is the complexity it brings to GroupedHashAggregateStream. With the Expand approach, AggregateExec could be grouping sets agnostic. So I would like to study the performance impact on:

  1. GroupingSet directly in AggregateExec vs GroupingSet expanded in analyzer/optimizer and AggregateExec grouping sets agnostic.
  2. Distinct with expand approach performance vs the current distinct performance

Also rewriting distinct with expand would make memory tracking for Aggregate more feasible, and aggregate state buffer more natural with row format.

@mingmwang
Copy link
Contributor

mingmwang commented Mar 31, 2023

SparkSQL's Expand approach will expand the rows numbers(more data copy for all the agg + group by columns) that flow into the AggregateExec. DataFusion's approach only expand the grouping columns during the evaluation of group by, agg columns with not be expanded, less data copy. And it is possible that we can implement some specific optimization like avoid calculation the hash values for duplicated group columns.
Maybe we can add another group stream implementation GroupedExpandHashAggregateStream and move the GroupingSet specific logic there.

I think one reason of the slowness is that DataFusion's aggregation framework is more complex, it invoke 3 data structures(the hashmap, the global group agg state vec, and the impacted group id idx vec), more complex control flow and less efficient memory access pattern, especially group by high cardinality columns.

Compared to DataBend, its aggregation framework and control flow is more straightforward, the value of the hashmap is just a memory address and the agg Accumulators update the memory address directly. It is more unsafe and more close to the C++ way. I think they learn it from ClickHouse.

@yjshen
Copy link
Member

yjshen commented Mar 31, 2023

SparkSQL's Expand approach will expand the rows numbers(more data copy for all the agg + group by columns) that flow into the AggregateExec

Yes, there will be more rows, but I don't think there will be more data copies. The values in unrelated group columns will be nullified and the state buffer will be created from its initial state without copying the aggregate column.

And it is possible that we can implement some specific optimization like avoid calculation the hash values for duplicated group columns.
I think one reason of the slowness is that DataFusion's aggregation framework is more complex, it invoke 3 data structures(the hashmap, the global group agg state vec, and the impacted group id idx vec), more complex control flow and less efficient memory access pattern, especially group by high cardinality columns.

We could talk about aggregate implementations/optimizations at #4973. And I believe Databend has made a lot of specific optimizations for the ClickHouse workload.

@alamb
Copy link
Contributor Author

alamb commented Feb 2, 2024

I think the recent work to specialize Count distinct for strings and native types means our performance here is now pretty good. #8849 #8721

I am not sure this ticket is tracking anything specific anymore, so closing it down for now

@alamb alamb closed this as completed Feb 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

7 participants