-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Design] Exchange Materialization #12387
Comments
This was referenced Feb 26, 2019
Closed
wenleix
changed the title
[Proposal] Support Materialized Exchange
[Proposal] Materialized Exchange
Mar 5, 2019
This was referenced Mar 12, 2019
wenleix
changed the title
[Proposal] Materialized Exchange
[Design] Exchange Materialization
Mar 15, 2019
This was referenced Apr 10, 2019
This was referenced Jul 16, 2019
This issue has been automatically marked as stale because it has not had any activity in the last 2 years. If you feel that this issue is important, just comment and the stale tag will be removed; otherwise it will be closed in 7 days. This is an attempt to ensure that our open issues remain valuable and relevant so that we can keep track of what needs to be done and prioritize the right things. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A comment-friendly version can be found in https://docs.google.com/document/d/1pQOOsveEN6KQPxiNDIii03lqa5x_O-HXWLt_T80dhRo/edit?usp=sharing .
Introduction
Grouped execution was introduced to Presto in #8951 to support huge join and aggregation raised in ETL pipelines.
When the input tables are already partitioned on the join key or aggregation key (e.g. bucketed table in Hive), Presto could process a subset of the partitions of the data at a time. This reduces the amount of memory needed to hold the hash table and opens opportunities to partial query recovery (see #12124 for more details)
While grouped execution makes Presto to run large ETL jobs, both in terms of memory and duration, it doesn’t work for unpartitioned table. In order to make grouped execution work for such cases, we could materialize the exchange (modeled as temporary partitioned table).
Materialized Exchange
In this section we will discuss the key question: what are the exchanges being materialized.
In this section, we will discuss Common Table Expression and materializing every REPARTITION exchange while left CBO as future work. While we have also evaluated the possibility of query hints (see appendix), it was not considered at this moment given it introduces ANSI SQL incompatible syntax.
Consider the following query:
We can use CTE to hint we want to have materialization point for relations defined in the query:
The implementation complexity of CTE depends how we do it:
A prototype based on “meta-planning” is available (i.e. split the queries based on CTE, and plan for each sub-queries individually). In this case, CTE doesn’t involve in plan optimization, and thus becomes the optimization barrier.
Make CTE involved in planner by introducing CTESinkNode and CTESourceNode. This makes CTE optimizable, however it changes the plan from a tree to a DAG, and we expect some significant engineering effort has to be put in. A recent advance on plan optimization over CTE can be found in
Optimization of Common Table Expressions in MPP Database Systems.
On the other hand, materializing all REPARTITION exchange doesn’t require query change. And we can use the existing tree shaped plan.
We propose the following path:
We will introduce the session property to allow engine to materialize the exchange. Today it will simply be expanded to TabeWriterOperator + TableScanOperator.
In the future, we should allow materialized exchange to be decided by CBO. One thought is a session property like “materialize-exchange” and take three values: NONE, ALWAYS, AUTOMATIC
We also need to think about whether it makes sense to abstract Exchange (e.g. introduce ExchangePageSink and ExchangePageSource, and migrate PartitionedOutputOperator as a special case of them)
Planner Support
In this section we will discuss how to support materializing exchange from planner side. Consider the same example query, the original simplified plan will be like the following:
During Plan fragementing, the join will be decided to be ungrouped execution, since the TableScan are not eligible (table are not partitioned). (See GroupedExecutionTagger).
When exchanges are decided to be materialized, the plan will first be “sectioned”, and ExchangeNode will be replaced by TableWriter/Finish and TableScan:
TableLayout and SplitSource
Since the query still get planned once before query executes, these temporary materialized table doesn’t exist at the plan time. This introduces difficulty to construct the TableLayout and SplitSource.
We propose the following solution:
A new
prepareExchangeMaterialization
method is introduced toConnectorMetadata
, and returns information required for planning, such as TableHandle, OutputHandle, TableLayoutHandle. For Hive connector, the temporary table will be registered in the in-memorySemiTransactionalMetastore
. The data will be removed at the end of transaction, and the table never get committed toMetastore
.Allow
ConnectorSplitSource
to be initialized in a lazy way.Execution Support for Multiple “Queries”
Andrii’s Prototype: arhimondr#1
The text was updated successfully, but these errors were encountered: