Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Extended Tokomak optimizer #1066

Closed
wants to merge 5 commits into from
Closed

Conversation

pjmore
Copy link
Contributor

@pjmore pjmore commented Oct 1, 2021

Related to #440

Looking on some feedback on a Tokomak based optimizer. I've added support for nearly all expressions as well as a number of simplification rules and constant folding rules. This is very much a work in progress as a number of the matching and conversion functions are pretty ugly and the expression parsing is pretty bad at the moment. I mostly wanted to get input on a couple things.

A bunch of these optimizations require the ability to execute expressions on literal values. I think the best way to allow external evaluation without implementing everything twice is for the physical expressions that it makes sense to execute on literal expressions to have the core logic for evaluation split out into an associated function.

The main risk I see with implementing it this way is that some physical expressions like BinaryExpr insert new type casting expressions in to the tree when they are constructed and it would be easy to miss updating the corresponding code in the optimizer.

I also added function volatility categories that are copy pasted from Postgres's definition, which has three categories: immutable, stable, and volatile. I'm not sure if datafusion needs a stable category though as that is for functions which will return the same values for the same arguments within a transaction.

Work that needs to be done before I'd consider this ready to even consider merging:

  • Clean up functions in utils
  • Determine how configurable the optimizer should be? Should users be able to register their own rewrite rules? Or just select which optimization categories to run or neither?
  • Clean up how expressions are parsed. Currently there is some pretty hacky code so that the udf in the expression
    (call udf[pow] (list 1 2))
    is parsed as a call to a udf and not a call expressions with a string as the second argument. While this isn't hugely important if the optimizer doesn't allow user defined rewrite rules it can make writing patterns involving strings and udfs brittle.
  • More tests
  • Determine if some classes like TokomakScalar and TokomakDatatype can be removed in favor of modifying their arrow/datafusion equivalents.
  • Change the datatypes involving time to avoid using brackets. I haven't tested this but I'm fairly certain that that would break how s expressions are parsed by egg.
  • Add support for expressions calling windowed functions.

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Oct 1, 2021
@Dandandan
Copy link
Contributor

Thanks for continuing this @pjmore , looks great! I will make some time for feedback

@houqp
Copy link
Member

houqp commented Oct 1, 2021

This looks really cool, thanks @pjmore for picking up the tokomak work :) I can help do a close review during the weekend too.

rw!("const-prop-between_inverted"; "(and (< ?e ?a)(> ?e ?b))" => "true" if var_gt(a,b)),
rw!("const-prop-binop-col-eq"; "(= ?a ?a)" => "(is_not_null ?a)"),
rw!("const-prop-binop-col-neq"; "(<> ?a ?a)" => "false"),
rw!("const-prop-add"; "(+ ?a ?b)"=>{ const_binop(a,b, Operator::Plus) } ),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we generalize these rules somehow to apply for all (binary) operators?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah for sure, egg has a Searcher trait which can do this. I'll take a Crack at using that instead of the whole list of rules.

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for this PR!

I only went through the changes in DataFusion. Overall they look great. I have some proposals to make the volatility more expressive and first class citizen.

Wrt to the optimizer itself, my recommendation is to make it a new crate under this repo, have a feature flag for it, and add it to the default list of optimizers when the feature is active: IMO, as a developer working in other parts of datafusion, I should not have to compile the optimizer every time I do a change to some other part of the crate.

@@ -92,6 +95,23 @@ impl ScalarUDF {
name: name.to_owned(),
signature: signature.clone(),
return_type: return_type.clone(),
volatility: Default::default(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this to the function signature: volatility is an important concept that imo should be part of the new (which is where all properties are introduced). Users are free to define a new new_scalar with a default volatility if their scalars are systematically volatile.

Comment on lines +103 to +114
pub fn with_volatility(
name: &str,
signature: &Signature,
return_type: &ReturnTypeFunction,
fun: &ScalarFunctionImplementation,
volatility: FunctionVolatility,
) -> Self {
Self {
name: name.to_owned(),
signature: signature.clone(),
return_type: return_type.clone(),
volatility,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed :)

impl PhysicalExpr for BinaryExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
binary_operator_data_type(
&self.left.data_type(input_schema)?,
&self.op,
&self.right.data_type(input_schema)?,
)
}

fn nullable(&self, input_schema: &Schema) -> Result<bool> {
Ok(self.left.nullable(input_schema)? || self.right.nullable(input_schema)?)
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let left_value = self.left.evaluate(batch)?;
let right_value = self.right.evaluate(batch)?;
impl BinaryExpr {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a simplification, (right?)

DataType::Timestamp(TimeUnit::Microsecond, None)
}
TokomakDataType::TimestampNanosecond => {
DataType::Timestamp(TimeUnit::Nanosecond, None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(no action needed, just a note) IMO we should solve this Timestamp with timezone ASAP: the arrow format supports timezone, and here we are already assuming that we not use it. However, this is tricky because we are just making it harder to support timezones.

Note that the "big" problem with timezones is that the DataType::Timestamp() is virtually uncountable, which usually makes typing systems a bit difficult to build on top of.

@@ -1428,6 +1460,423 @@ where
})
}

pub(crate) fn create_immutable_impl(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to make (most/all) builtin immutable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like lots of duplication between this and create_physical_fun, would be good to refactor and reuse the code so they don't get out of sync.

///Volatile - Functions where output can vary for each call.
///For more information see https://www.postgresql.org/docs/current/xfunc-volatility.html
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(u32)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this representation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason I just wanted to be able to filter on the volatility being equal to or stricter than a given level and I wasn't sure how the derived derived implementations would do it. I'll remove the representation and switch to the default derived version.

Volatile = 2,
}

impl Default for FunctionVolatility {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO there is no default volatility (i.e. I am -1 on this). Defaults make it very easy for someone to just use the default and ignore the semantics of volatility.

For example, the volatility in spark never made it to pyspark, which made it impossible for users to write immutable UDFs, which forced some filtering pushdown to not be applied (even if the user knows that the UDF is immutable).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes a lot of sense. I'll remove the default volatility.

Comment on lines +556 to +560
///Function volatility determines when a function can be inlined or in the future have evaluations elided when the arguments are the same
///Immutable - a pure function which always remains the same
///Stable - Maybe not required?. Used for functions which can modify the db but are stable for multiple calls in statement
///Volatile - Functions where output can vary for each call.
///For more information see https://www.postgresql.org/docs/current/xfunc-volatility.html
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
///Function volatility determines when a function can be inlined or in the future have evaluations elided when the arguments are the same
///Immutable - a pure function which always remains the same
///Stable - Maybe not required?. Used for functions which can modify the db but are stable for multiple calls in statement
///Volatile - Functions where output can vary for each call.
///For more information see https://www.postgresql.org/docs/current/xfunc-volatility.html
/// Determines when a function can be inlined or in the future have evaluations elided when the arguments are the same
/// For more information see https://www.postgresql.org/docs/current/xfunc-volatility.html

We can use the description of each item to document the enum

///Stable - Maybe not required?. Used for functions which can modify the db but are stable for multiple calls in statement
///Volatile - Functions where output can vary for each call.
///For more information see https://www.postgresql.org/docs/current/xfunc-volatility.html
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]

Comment on lines +564 to +569
///Immutable - a pure function which always remains the same
Immutable = 0,
///Stable - Maybe not required?. Used for functions which can modify the db but are stable for multiple calls in statement is this relevant for datafusion as in mem?
Stable = 1,
///Volatile - Functions where output can vary for each call.
Volatile = 2,
Copy link
Member

@jorgecarleitao jorgecarleitao Oct 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
///Immutable - a pure function which always remains the same
Immutable = 0,
///Stable - Maybe not required?. Used for functions which can modify the db but are stable for multiple calls in statement is this relevant for datafusion as in mem?
Stable = 1,
///Volatile - Functions where output can vary for each call.
Volatile = 2,
/// A pure function - one whose always returns the same given inputs and has no side-effects. (E.g. `SQRT`)
Immutable,
/// A function whose output can vary for each call. (E.g. `CURRENT_DATE`)
Volatile,

If not required for now, remove it for now :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW now() is an example of a stable function I believe

The distinction is that within a query, all instances of now() in the same query evaluate to the same value, though for different queries they may evaluate differently. Volatile functions such as rand() really can and do produce a different value for each invocation even within the same query.

So from DataFusion's perspective, we should be able to fold stable and immutable functions (not volatile)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point @alamb , I agree and I think we should keep the Stable variant.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this PR @pjmore -- very cool

Structurally I like @jorgecarleitao 's idea of making a new crate and using the pluggable optimizer framework for the actual connection to tokomak - this would both keep compile time down as well as allow users to opt in to the new optimizer until it was ready.

I think at least two of the supporting infrastructure items you mention, such as "execute expressions on literal values" and "volatility categories" would be good candidates to add to the main crate as they would be generally useful (e.g. we could expand our existing constant folding optimizer pass significantly with such infrastructure, even as we worked out the kinks with tokomak). Breaking them up might also help with review and getting them in.

If you agree, perhaps I could file tickets for those two items?

@pjmore
Copy link
Contributor Author

pjmore commented Oct 2, 2021

@alamb Sounds good to me!

rw!("converse-lte"; "(<= ?x ?y)" => "(>= ?y ?x)"),
rw!("add-0"; "(+ ?x 0)" => "?x"),
rw!("add-assoc"; "(+ (+ ?a ?b) ?c)" => "(+ ?a (+ ?b ?c))"),
rw!("minus-0"; "(- ?x 0)" => "?x"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing mul-0 and 0-div?

) -> Result<ScalarValue> {
CastExpr::cast_scalar_type(scalar, cast_type, &DEFAULT_DATAFUSION_CAST_OPTIONS)
}
pub(crate) fn cast_scalar_type(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick, perhaps these can be implemented as simple pub(crate) functions like the cast_column function below? Any reason why we want to make it a struct method?

@@ -1428,6 +1460,423 @@ where
})
}

pub(crate) fn create_immutable_impl(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like lots of duplication between this and create_physical_fun, would be good to refactor and reuse the code so they don't get out of sync.

@alamb
Copy link
Contributor

alamb commented Oct 4, 2021

Filed #1069 for volitality -- I will file another for partial evaluation

@alamb
Copy link
Contributor

alamb commented Oct 4, 2021

#1070 for general purpose constant folding

@pjmore
Copy link
Contributor Author

pjmore commented Oct 5, 2021

Thanks for all the feedback on this! I'm going to work on implementing the supporting issues and then move the optimizer to its own crate.

@pjmore pjmore closed this Oct 5, 2021
@alamb
Copy link
Contributor

alamb commented Oct 5, 2021

Thanks @pjmore -- looking forward to some great stuff

@Dandandan
Copy link
Contributor

Hey @pjmore did you do any follow up work on this? I think it would be great to have these improvements you made in source control.

@pjmore
Copy link
Contributor Author

pjmore commented Dec 23, 2021

@Dandandan The current iteration of the code is here. It can handle optimizing plans now and is mostly ready to go I'm just doing a bit of clean up at the moment.

@Dandandan
Copy link
Contributor

@Dandandan The current iteration of the code is here. It can handle optimizing plans now and is mostly ready to go I'm just doing a bit of clean up at the moment.

Awesome! Let me know when/where I can help.

@pjmore
Copy link
Contributor Author

pjmore commented Dec 25, 2021

At this point I think there are two remaining chunks of work. Writing tests and making sure that it respects any invariants like schema field names. I'm going to open a draft pull request and get started on the tests.

@alamb
Copy link
Contributor

alamb commented Dec 26, 2021

At this point I think there are two remaining chunks of work. Writing tests and making sure that it respects any invariants like schema field names. I'm going to open a draft pull request and get started on the tests.

you might find the tests in https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/optimizer/simplify_expressions.rs useful

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants