-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Planner: support Join #5320
Planner: support Join #5320
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/run-all-tests |
Coverage for changed files
Coverage summary
full coverage report (for internal network access only) |
Expression: <cast after aggregation> | ||
SharedQuery: <restore concurrency> | ||
ParallelAggregating, max_threads: 10, final: true | ||
Expression x 10: <before aggregation> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
projection action to remove useless column.
The output columns of join are {r_a, r_b, join_c, l_a, l_b, join_c}.
The columns agg needed are {r_a, join_c}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because now PhysicalPlan::outputAndOptimize
can work on Join
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, after optimize, Useless columns can be destructed earlier.
@@ -27,7 +28,7 @@ PhysicalPlanNodePtr PhysicalSource::build( | |||
NamesAndTypes schema; | |||
for (const auto & col : sample_block) | |||
schema.emplace_back(col.name, col.type); | |||
return std::make_shared<PhysicalSource>("source", schema, log->identifier(), sample_block, source_streams); | |||
return std::make_shared<PhysicalSource>(executor_id, schema, log->identifier(), sample_block, source_streams); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order for the physical node to get the executor_id of the actual child
auto left = popBack(); | ||
|
||
// use for gtest_physical_plan | ||
if (dagContext().isTest() && right->tp() != PlanType::Source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After DAGQueryBlock removed, it will be removed
physical_plan.buildSource(input_streams); | ||
RUNTIME_ASSERT(!input_streams_vec[i].empty(), log, "input streams cannot be empty"); | ||
assert(query_block.children[i] && query_block.children[i]->root && query_block.children[i]->root->has_executor_id()); | ||
physical_plan.buildSource(query_block.children[i]->root->executor_id(), input_streams_vec[i]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
query_block.children[i]->root->executor_id()
In order for the physical node to get the executor_id of the actual child
Expression: <cast after aggregation> | ||
SharedQuery: <restore concurrency> | ||
ParallelAggregating, max_threads: 10, final: true | ||
Expression x 10: <before aggregation> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Expression: <append join key and join filters for probe side> | ||
Expression: <final projection> | ||
MockTableScan | ||
Expression x 10: <before aggregation> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
MockExchangeReceiver | ||
Expression x 20: <remove useless column after join> | ||
NonJoined: <add stream with non_joined_data if full_or_right_join>)"; | ||
Expression x 20: <before aggregation> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Expression: <append join key and join filters for probe side> | ||
Expression: <final projection> | ||
MockExchangeReceiver | ||
Expression x 20: <before aggregation> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
cfd6b91
to
2dc2a34
Compare
/run-all-tests |
Coverage for changed files
Coverage summary
full coverage report (for internal network access only) |
05fd9bd
to
7693e14
Compare
/run-all-tests |
Coverage for changed files
Coverage summary
full coverage report (for internal network access only) |
RUNTIME_ASSERT(!input_streams.empty(), log, "input streams cannot be empty"); | ||
physical_plan.buildSource(input_streams); | ||
RUNTIME_ASSERT(!input_streams_vec[i].empty(), log, "input streams cannot be empty"); | ||
assert(query_block.children[i] && query_block.children[i]->root && query_block.children[i]->root->has_executor_id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not true for some list based executors(sent from TiSpark), do we plan to support that in Planner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for list based executors, input_streams_vec.size() == 0.
So no error will occur here for list based executors :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we plan to support that in Planner?
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why list based executors has input_streams_vec.size() == 0 ?
does it take another interpreter path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because for list based executors , there will only be one query block.
And the source of query block will only be table scan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for TableScan, the input_streams_vec must be empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got
Expression: <cast after aggregation> | ||
SharedQuery: <restore concurrency> | ||
ParallelAggregating, max_threads: 10, final: true | ||
Expression x 10: <before aggregation> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because now PhysicalPlan::outputAndOptimize
can work on Join
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
others lgtm
Co-authored-by: Zhi Qi <[email protected]>
/merge |
@SeaRise: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: a55ff0a
|
/merge |
@SeaRise: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: be40431
|
Coverage for changed files
Coverage summary
full coverage report (for internal network access only) |
What problem does this PR solve?
Issue Number: ref #4739
Problem Summary:
What is changed and how it works?
PhysicalJoin
PhysicalSource.executor_id = child_query_block->root->executor_id
andFinalProjection.executor_id = child_physical_plan.executor_id
, In order for the physical node to get the executor_id of the actual child.Check List
Tests
Side effects
Documentation
Release note