Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend the simple UDAF interface with function-level states #9167

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 84 additions & 36 deletions velox/docs/develop/aggregate-functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,33 @@ A simple aggregation function is implemented as a class as the following.
using IntermediateType = Array<Generic<T1>>;
using OutputType = Array<Generic<T1>>;

// Define a struct for function-level states. Even if the aggregation function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make 'FunctionState' struct optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass FunctionState as an input parameter for methods like addInput, combine, etc. If it's not defined, It will result in a compilation failure.

// doesn't use function-level states, it is still necessary to define an empty
// FunctionState struct.
struct FunctionState {
// Optional.
TypePtr resultType;
};

// Optional. Defined only when the aggregation function needs to use function-level states.
// This method is called once when the aggregation function is created.
static void initialize(
core::AggregationNode::Step step,
FunctionState& state,
const std::vector<TypePtr>& rawInputTypes,
const TypePtr& resultType,
const std::vector<VectorPtr>& constantInputs,
std::optional<core::AggregationNode::Step> companionStep) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document all the parameters? What is 'companionStep'? It seems strange that we have such a parameter as function implementations should be agnostic to whether they are used as "regular" or a "companion" function.

CC: @kagamiori

Copy link
Contributor Author

@liujiayi771 liujiayi771 Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added document for all parameters in the definition of initialize in Aggregate.h. Should I also add these comments in the rst documentation?

For the partial companion function, we need to know that its companion step is kPartial, while the agg function itself includes kPartial, kFinal, or kSingle steps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liujiayi771 Thank you for updating PR description. It sounds like we want to allow stateful simple aggregate functions. This makes sense, but I wonder if we can make it work similar to Simple Function API for scalar functions. There, function author defines a struct with call-once initialize and call-per-row call methods. The author is then free to add member variables to hold state and initialize it however they want from 'initialize'. Would it make sense to follow this pattern for aggregate functions as well?

The result type is currently not exposed to author-defined accumulator type in the simple function interface.

Can we expose this?

Some functions perform heavy computation on a constant argument before
processing all input rows, such as approx_most_frequent, and store the
computation result in a function-level state.

Would you clarify what is "heavy computation" done by approx_most_frequent to help readers understand a bit better?

function-level state.

I wonder if a more accurate term would be "per-instance state". There is only one function, foo, but there are many instances of 'foo' and each has its own state, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mbasmanova,

but I wonder if we can make it work similar to Simple Function API for scalar functions. There, function author defines a struct with call-once initialize and call-per-row call methods. The author is then free to add member variables to hold state and initialize it however they want from 'initialize'.

The SimpleAggregateAdapter currently doesn't hold an instance of the user-defined simple UDAF class (i.e., it only creates instances of the AccumulatorType struct inside the UDAF class). We can change SimpleAggregateAdapter to hold an instance of the UDAF class if we want to allow authors to freely access member variables in it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change SimpleAggregateAdapter to hold an instance of the UDAF class if we want to allow authors to freely access member variables in it.

This would be nice. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kagamiori, I will try to understand this method and see how it can be modified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @liujiayi771, sorry for the delay. Here is a code pointer of how the simple scalar function interface calls the initialize() method:

return (*fn_).initialize(inputTypes, config, values...);

The fn_ here is a std::unique_ptr of the UDF class. Because this UDF instance is created, the author can add function-level states as member variables in the UDF class and the UDF's initialize() and call() member methods can access them directly. An example UDF that uses initialize() is below.

FOLLY_ALWAYS_INLINE void initialize(

What @mbasmanova suggested is that we can do it similar in the SimpleAggregateAdapter so that the UDAF authors doesn't have to keep function-level states. Specifically, below is what I’m thinking:

  1. The UDAF author doesn’t define a FunctionState struct, but rather add function-level variables as data members in the UDAF class (outside of its AccumulatorType struct).
  2. The UDAF class has an initialize() method that receives the aggregation step, the types, and the constantInput, and assigns values to the data members in the UDAF class.
  3. The AccumulatorType struct has a data member that is a pointer to the UDAF class. This would allow member methods inside AccumulatorType to access data members in the UDAF class. This UDAF-pointer can be set in SimpleAggregateAdapter::initializeNewGroupsInternal().

I’ll try to make a prototype to see if this works. Let’s discuss and review this design before coding in #8711 first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reply @kagamiori. Let's discuss in #8711 further once the prototype has been validated for feasibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kagamiori. Do you have time to make a prototype for this design. cc @rui-mo.

state.resultType = resultType;
}

// Optional. Default is true.
static constexpr bool default_null_behavior_ = false;

// Optional.
static bool toIntermediate(
exec::out_type<Array<Generic<T1>>>& out,
exec::optional_arg_type<Generic<T1>> in);
exec::out_type<Array<Generic<T1>>>& out,
exec::optional_arg_type<Generic<T1>> in);

struct AccumulatorType { ... };
};
Expand All @@ -169,6 +189,15 @@ function's argument type(s) wrapped in a Row<> even if the function only takes
one argument. This is needed for the SimpleAggregateAdapter to parse input
types for arbitrary aggregation functions properly.

A FunctionState struct needs to be declared in the simple aggregation function
class. FunctionState is initialized once when the aggregation function is
created and used at every row when adding inputs to accumulators or extracting
values from accumulators. For example, if the aggregation function needs to get
the result type or the raw input type of the aggregaiton function, the author
can hold them in the FunctionState struct, and initialize them in the
initialize() method. If the aggregation function does not require the use of
FunctionState, it is necessary to declare an empty FunctionState struct.

The author can define an optional flag `default_null_behavior_` indicating
whether the aggregation function has default-null behavior. This flag is true
by default. Next, the class can have an optional method `toIntermediate()`
Expand Down Expand Up @@ -257,17 +286,21 @@ For aggregaiton functions of default-null behavior, the author defines an
// Optional. Default is false.
static constexpr bool is_aligned_ = true;

explicit AccumulatorType(HashStringAllocator* allocator);
explicit AccumulatorType(HashStringAllocator* allocator, const FunctionState& state);

void addInput(HashStringAllocator* allocator, exec::arg_type<T1> value1, ...);
void addInput(
HashStringAllocator* allocator,
exec::arg_type<T1> value1, ...,
const FunctionState& state);

void combine(
HashStringAllocator* allocator,
exec::arg_type<IntermediateType> other);
exec::arg_type<IntermediateType> other,
const FunctionState& state);

bool writeIntermediateResult(exec::out_type<IntermediateType>& out);
bool writeIntermediateResult(exec::out_type<IntermediateType>& out, const FunctionState& state);

bool writeFinalResult(exec::out_type<OutputType>& out);
bool writeFinalResult(exec::out_type<OutputType>& out, const FunctionState& state);

// Optional. Called during destruction.
void destroy(HashStringAllocator* allocator);
Expand Down Expand Up @@ -296,7 +329,8 @@ addInput

This method adds raw input values to *this* accumulator. It receives a
`HashStringAllocator*` followed by `exec::arg_type<T1>`-typed values, one for
each argument type `Ti` wrapped in InputType.
each argument type `Ti` wrapped in InputType. `const FunctionState&` hold the
function-level variables.

With default-null behavior, raw-input rows where at least one column is null are
ignored before `addInput` is called. After `addInput` is called, *this*
Expand All @@ -306,31 +340,32 @@ combine
"""""""

This method adds an input intermediate state to *this* accumulator. It receives
a `HashStringAllocator*` and one `exec::arg_type<IntermediateType>` value. With
default-null behavior, nulls among the input intermediate states are ignored
before `combine` is called. After `combine` is called, *this* accumulator is
assumed to be non-null.
a `HashStringAllocator*` and one `exec::arg_type<IntermediateType>` value.
`const FunctionState&` hold the function-level variables. With default-null
behavior, nulls among the input intermediate states are ignored before `combine`
is called. After `combine` is called, *this* accumulator is assumed to be non-null.

writeIntermediateResult
"""""""""""""""""""""""

This method writes *this* accumulator out to an intermediate state vector. It
has an out-parameter of the type `exec::out_type<IntermediateType>&`. This
method returns true if it writes a non-null value to `out`, or returns false
meaning a null should be written to the intermediate state vector. Accumulators
that are nulls (i.e., no value has been added to them) automatically become
nulls in the intermediate state vector without `writeIntermediateResult` being
called.
has an out-parameter of the type `exec::out_type<IntermediateType>&`.
`const FunctionState&` hold the function-level variables. This method returns
true if it writes a non-null value to `out`, or returns false meaning a null
should be written to the intermediate state vector. Accumulators that are
nulls (i.e., no value has been added to them) automatically become nulls in
the intermediate state vector without `writeIntermediateResult` being called.

writeFinalResult
""""""""""""""""

This method writes *this* accumulator out to a final result vector. It
has an out-parameter of the type `exec::out_type<OutputType>&`. This
method returns true if it writes a non-null value to `out`, or returns false
meaning a null should be written to the final result vector. Accumulators
that are nulls (i.e., no value has been added to them) automatically become
nulls in the final result vector without `writeFinalResult` being called.
has an out-parameter of the type `exec::out_type<OutputType>&`.
`const FunctionState&` hold the function-level variables. This method returns
true if it writes a non-null value to `out`, or returns false meaning a null
should be written to the final result vector. Accumulators that are
nulls (i.e., no value has been added to them) automatically become nulls in the
final result vector without `writeFinalResult` being called.

AccumulatorType of Non-Default-Null Behavior
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand All @@ -355,15 +390,25 @@ For aggregaiton functions of non-default-null behavior, the author defines an

explicit AccumulatorType(HashStringAllocator* allocator);

bool addInput(HashStringAllocator* allocator, exec::optional_arg_type<T1> value1, ...);
bool addInput(
HashStringAllocator* allocator,
exec::optional_arg_type<T1> value1, ...,
const FunctionState& state);

bool combine(
HashStringAllocator* allocator,
exec::optional_arg_type<IntermediateType> other);
exec::optional_arg_type<IntermediateType> other,
const FunctionState& state);

bool writeIntermediateResult(bool nonNullGroup, exec::out_type<IntermediateType>& out);
bool writeIntermediateResult(
bool nonNullGroup,
exec::out_type<IntermediateType>& out,
const FunctionState& state);

bool writeFinalResult(bool nonNullGroup, exec::out_type<OutputType>& out);
bool writeFinalResult(
bool nonNullGroup,
exec::out_type<OutputType>& out,
const FunctionState& state);

// Optional.
void destroy(HashStringAllocator* allocator);
Expand All @@ -384,7 +429,7 @@ addInput

This method receives a `HashStringAllocator*` followed by
`exec::optional_arg_type<T1>` values, one for each argument type `Ti` wrapped
in InputType.
in InputType. `const FunctionState&` hold the function-level variables.

This method is called on all raw-input rows even if some columns may be null.
It returns a boolean meaning whether *this* accumulator is non-null after the
Expand All @@ -397,26 +442,29 @@ combine
"""""""

This method receives a `HashStringAllocator*` and an
`exec::optional_arg_type<IntermediateType>` value. This method is called on
all intermediate states even if some are nulls. Same as `addInput`, this method
returns a boolean meaning whether *this* accumulator is non-null after the call.
`exec::optional_arg_type<IntermediateType>` value. `const FunctionState&` hold
the function-level variables.This method is called on all intermediate states
even if some are nulls. Same as `addInput`, this method returns a boolean
meaning whether *this* accumulator is non-null after the call.

writeIntermediateResult
"""""""""""""""""""""""

This method has an out-parameter of the type `exec::out_type<IntermediateType>&`
and a boolean flag `nonNullGroup` indicating whether *this* accumulator is
non-null. This method returns true if it writes a non-null value to `out`, or
return false meaning a null should be written to the intermediate state vector.
non-null. `const FunctionState&` hold the function-level variables. This method
returns true if it writes a non-null value to `out`, or return false meaning a
null should be written to the intermediate state vector.

writeFinalResult
""""""""""""""""

This method writes *this* accumulator out to a final result vector. It has an
out-parameter of the type `exec::out_type<OutputType>&` and a boolean flag
`nonNullGroup` indicating whether *this* accumulator is non-null. This method
returns true if it writes a non-null value to `out`, or return false meaning a
null should be written to the final result vector.
`nonNullGroup` indicating whether *this* accumulator is non-null.
`const FunctionState&` hold the function-level variables.This method returns
true if it writes a non-null value to `out`, or return false meaning a null
should be written to the final result vector.

Limitations
^^^^^^^^^^^
Expand Down
21 changes: 21 additions & 0 deletions velox/exec/Aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ class Aggregate {
rowSizeOffset);
}

// Initialize the function-level state of the simple function interface for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... this API should not be aware of Simple Function Interface... looks like there might be some leak in the design.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mbasmanova, thank you for the feedback. I saw that for simple scalar functions, we call initialize() in the constructor of SimpleFunctionAdapter. It can do this because ExprCompiler passes constantInputs to the function factory. The Aggregate::create() and aggregation function factory currently do not receive constantInputs as an argument. What about we pass constantInputs to them and move the call of initialize() into the constructor of SimpleAggregateAdapter? We'll have to pass constantInputs to all aggregation function factories though, since we cannot tell simple UDAFs from regular UDAFs apart in the function registry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Aggregate::create() and aggregation function factory currently do not receive constantInputs as an argument. What about we pass constantInputs to them and move the call of initialize() into the constructor of SimpleAggregateAdapter?

That sounds good. Thanks.

// UDAF.
// @param step The aggregation step.
// @param rawInputType The raw input type of the UDAF.
// @param resultType The result type of the current aggregation step.
// @param constantInputs Optional constant input values for aggregate
// function. constantInputs should be empty if there are no constant inputs,
// aligned with inputTypes if there is at least one constant input, with
// non-constant inputs represented as nullptr, and must be instances of
// ConstantVector.
// @param companionStep The step used to register aggregate companion
// functions. kPartial for partial companion function, kIntermediate for merge
// and merge extract companion function.
virtual void initialize(
core::AggregationNode::Step step,
const std::vector<TypePtr>& rawInputType,
const TypePtr& resultType,
const std::vector<VectorPtr>& constantInputs,
std::optional<core::AggregationNode::Step> companionStep = std::nullopt) {
}

// Initializes null flags and accumulators for newly encountered groups. This
// function should be called only once for each group.
//
Expand Down
61 changes: 61 additions & 0 deletions velox/exec/AggregateCompanionAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,41 @@ void AggregateCompanionFunctionBase::extractAccumulators(
fn_->extractAccumulators(groups, numGroups, result);
}

void AggregateCompanionAdapter::PartialFunction::initialize(
core::AggregationNode::Step step,
const std::vector<TypePtr>& rawInputType,
const facebook::velox::TypePtr& resultType,
const std::vector<VectorPtr>& constantInputs,
std::optional<core::AggregationNode::Step> /*companionStep*/) {
fn_->initialize(
step,
rawInputType,
resultType,
constantInputs,
core::AggregationNode::Step::kPartial);
}

void AggregateCompanionAdapter::PartialFunction::extractValues(
char** groups,
int32_t numGroups,
VectorPtr* result) {
fn_->extractAccumulators(groups, numGroups, result);
}

void AggregateCompanionAdapter::MergeFunction::initialize(
core::AggregationNode::Step step,
const std::vector<TypePtr>& rawInputType,
const facebook::velox::TypePtr& resultType,
const std::vector<VectorPtr>& constantInputs,
std::optional<core::AggregationNode::Step> /*companionStep*/) {
fn_->initialize(
step,
rawInputType,
resultType,
constantInputs,
core::AggregationNode::Step::kIntermediate);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also override the initialize() method of MergeExtractFunction to pass kFinal as the companion step.

}

void AggregateCompanionAdapter::MergeFunction::addRawInput(
char** groups,
const SelectivityVector& rows,
Expand All @@ -156,6 +184,20 @@ void AggregateCompanionAdapter::MergeFunction::extractValues(
fn_->extractAccumulators(groups, numGroups, result);
}

void AggregateCompanionAdapter::MergeExtractFunction::initialize(
core::AggregationNode::Step step,
const std::vector<TypePtr>& rawInputType,
const facebook::velox::TypePtr& resultType,
const std::vector<VectorPtr>& constantInputs,
std::optional<core::AggregationNode::Step> /*companionStep*/) {
fn_->initialize(
step,
rawInputType,
resultType,
constantInputs,
core::AggregationNode::Step::kFinal);
}

void AggregateCompanionAdapter::MergeExtractFunction::extractValues(
char** groups,
int32_t numGroups,
Expand Down Expand Up @@ -229,6 +271,25 @@ void AggregateCompanionAdapter::ExtractFunction::apply(
// Perform per-row aggregation.
std::vector<vector_size_t> allSelectedRange;
rows.applyToSelected([&](auto row) { allSelectedRange.push_back(row); });

// Get the raw input types.
std::vector<TypePtr> rawInputTypes{args.size()};
std::vector<VectorPtr> constantInputs{args.size()};
for (auto i = 0; i < args.size(); i++) {
rawInputTypes[i] = args[i]->type();
if (args[i]->isConstantEncoding()) {
constantInputs[i] = args[i];
} else {
constantInputs[i] = nullptr;
}
}

fn_->initialize(
core::AggregationNode::Step::kFinal,
rawInputTypes,
outputType,
constantInputs,
core::AggregationNode::Step::kFinal);
fn_->initializeNewGroups(groups, allSelectedRange);
fn_->enableValidateIntermediateInputs();
fn_->addIntermediateResults(groups, rows, args, false);
Expand Down
21 changes: 21 additions & 0 deletions velox/exec/AggregateCompanionAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ struct AggregateCompanionAdapter {
const TypePtr& resultType)
: AggregateCompanionFunctionBase{std::move(fn), resultType} {}

void initialize(
core::AggregationNode::Step step,
const std::vector<TypePtr>& rawInputType,
const TypePtr& resultType,
const std::vector<VectorPtr>& constantInputs,
std::optional<core::AggregationNode::Step> companionStep) override;

void extractValues(char** groups, int32_t numGroups, VectorPtr* result)
override;
};
Expand All @@ -110,6 +117,13 @@ struct AggregateCompanionAdapter {
const TypePtr& resultType)
: AggregateCompanionFunctionBase{std::move(fn), resultType} {}

void initialize(
core::AggregationNode::Step step,
const std::vector<TypePtr>& rawInputType,
const TypePtr& resultType,
const std::vector<VectorPtr>& constantInputs,
std::optional<core::AggregationNode::Step> companionStep) override;

void addRawInput(
char** groups,
const SelectivityVector& rows,
Expand All @@ -133,6 +147,13 @@ struct AggregateCompanionAdapter {
const TypePtr& resultType)
: MergeFunction{std::move(fn), resultType} {}

void initialize(
core::AggregationNode::Step step,
const std::vector<TypePtr>& rawInputType,
const TypePtr& resultType,
const std::vector<VectorPtr>& constantInputs,
std::optional<core::AggregationNode::Step> companionStep) override;

void extractValues(char** groups, int32_t numGroups, VectorPtr* result)
override;
};
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/AggregateInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ std::vector<AggregateInfo> toAggregateInfo(
aggResultType,
operatorCtx.driverCtx()->queryConfig());

info.function->initialize(
step, aggregate.rawInputTypes, aggResultType, info.constantInputs);
auto lambdas = extractLambdaInputs(aggregate);
if (!lambdas.empty()) {
if (expressionEvaluator == nullptr) {
Expand Down
Loading
Loading