Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Planner: planner without DAGQueryBlock #5381

Merged
merged 37 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
05fd9bd
Feature Branch: merge master to `planner_refactory` branch (#5353)
SeaRise Jul 12, 2022
0d7ef30
u
SeaRise Jul 14, 2022
09a5e03
update
SeaRise Jul 15, 2022
e5b8cda
udpate
SeaRise Jul 15, 2022
1bda85a
y
SeaRise Jul 15, 2022
c53dcff
fix
SeaRise Jul 15, 2022
8312b95
merge
SeaRise Jul 18, 2022
0676e16
update
SeaRise Jul 18, 2022
353a68b
update
SeaRise Jul 20, 2022
e97bb7b
update
SeaRise Jul 20, 2022
f0f6d86
Update dbms/src/Flash/Planner/PlanQuerySource.cpp
SeaRise Jul 20, 2022
a9037af
u
SeaRise Jul 20, 2022
8046f68
Merge branch 'remove_qb' of https://github.com/SeaRise/tiflash into r…
SeaRise Jul 20, 2022
65a7e3e
add disable planner test
SeaRise Jul 25, 2022
69d672f
u
SeaRise Jul 25, 2022
1b6064c
Merge branch 'planner_refactory' into remove_qb
SeaRise Jul 25, 2022
b5ff7cf
merge planner_refactory
SeaRise Jul 25, 2022
03a0ab5
fix
SeaRise Jul 25, 2022
4a25811
Merge branch 'planner_refactory' into remove_qb
SeaRise Jul 26, 2022
6e2dcc7
wait for tests fixed
SeaRise Jul 26, 2022
a098564
update
SeaRise Jul 26, 2022
c13c26c
fix some tests
SeaRise Jul 26, 2022
ec1d177
fix PlannerInterpreterExecuteTest
SeaRise Jul 26, 2022
3c7d777
fix window
SeaRise Jul 26, 2022
b1d82c6
add some join tests
SeaRise Jul 26, 2022
2e83735
executeUnion before join probe
SeaRise Jul 26, 2022
efe0b7d
add ln
SeaRise Jul 26, 2022
228226f
remove debug codes
SeaRise Jul 26, 2022
c7ea33c
record order for listbase executors
SeaRise Jul 27, 2022
fab303d
remove useless include
SeaRise Jul 27, 2022
345e110
add
SeaRise Jul 27, 2022
271c83e
fix tests
SeaRise Jul 27, 2022
ce35f9d
address comments
SeaRise Jul 28, 2022
23937bd
address comments
SeaRise Aug 1, 2022
0a4b465
address comments
SeaRise Aug 2, 2022
3140faa
update
SeaRise Aug 2, 2022
00409a5
fix test
SeaRise Aug 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
#include <Flash/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>
#include <pingcap/Exception.h>
Expand Down Expand Up @@ -89,10 +88,9 @@ void DAGDriver<batch>::execute()
try
{
auto start_time = Clock::now();
DAGQuerySource dag(context);
DAGContext & dag_context = *context.getDAGContext();

BlockIO streams = executeQuery(dag, context, internal, QueryProcessingStage::Complete);
BlockIO streams = executeQuery(context, internal, QueryProcessingStage::Complete);
if (!streams.in || streams.out)
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw TiFlashException("DAG is not query.", Errors::Coprocessor::Internal);
Expand Down
45 changes: 7 additions & 38 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Planner/Planner.h>
#include <Interpreters/Context.h>

namespace DB
Expand Down Expand Up @@ -55,49 +53,20 @@ BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block)
BlockInputStreams child_streams = executeQueryBlock(*child);
input_streams_vec.push_back(child_streams);
}
if (context.getSettingsRef().enable_planner && Planner::isSupported(query_block))
{
LOG_FMT_DEBUG(dagContext().log, "use planer for query block with source {}", query_block.source_name);
Planner planner(
context,
input_streams_vec,
query_block,
max_streams);
return planner.execute();
}
else
{
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams);
return query_block_interpreter.execute();
}
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams);
return query_block_interpreter.execute();
Comment on lines +56 to +61
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restored to their original

}

BlockIO InterpreterDAG::execute()
{
BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock());
DAGPipeline pipeline;
pipeline.streams = streams;
/// add union to run in parallel if needed
if (unlikely(dagContext().isTest()))
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for test");
else if (dagContext().isMPPTask())
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true, "for mpp");
else
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for non mpp");
if (dagContext().hasSubquery())
{
const Settings & settings = context.getSettingsRef();
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(),
std::move(dagContext().moveSubqueries()),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
dagContext().log->identifier());
}
executeCreatingSets(pipeline, context, max_streams, dagContext().log);
BlockIO res;
res.in = pipeline.firstStream();
return res;
Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
Expand Down Expand Up @@ -165,4 +166,30 @@ void orderStreams(
log->identifier());
}
}

void executeCreatingSets(
DAGPipeline & pipeline,
const Context & context,
size_t max_streams,
const LoggerPtr & log)
{
DAGContext & dag_context = *context.getDAGContext();
/// add union to run in parallel if needed
if (unlikely(dag_context.isTest()))
executeUnion(pipeline, max_streams, log, /*ignore_block=*/false, "for test");
else if (dag_context.isMPPTask())
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, log, /*ignore_block=*/true, "for mpp");
else
executeUnion(pipeline, max_streams, log, /*ignore_block=*/false, "for non mpp");
if (dag_context.hasSubquery())
{
const Settings & settings = context.getSettingsRef();
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(),
std::move(dag_context.moveSubqueries()),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
log->identifier());
}
}
} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ void orderStreams(
bool enable_fine_grained_shuffle,
const Context & context,
const LoggerPtr & log);

void executeCreatingSets(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

port from #5466

DAGPipeline & pipeline,
const Context & context,
size_t max_streams,
const LoggerPtr & log);
} // namespace DB
5 changes: 2 additions & 3 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
#include <Flash/Mpp/MinTSOScheduler.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Flash/executeQuery.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>
#include <fmt/core.h>
Expand Down Expand Up @@ -318,8 +318,7 @@ void MPPTask::preprocess()
{
auto start_time = Clock::now();
initExchangeReceivers();
DAGQuerySource dag(*context);
executeQuery(dag, *context, false, QueryProcessingStage::Complete);
executeQuery(*context);
auto end_time = Clock::now();
dag_context->compile_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count();
mpp_task_statistics.setCompileTimestamp(start_time, end_time);
Expand Down
23 changes: 4 additions & 19 deletions dbms/src/Flash/Planner/FinalizeHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,28 +118,13 @@ void checkSampleBlockContainsSchema(const Block & sample_block, const NamesAndTy
}
}

void checkSchemaContainsSampleBlock(const NamesAndTypes & schema, const Block & sample_block)
void checkSampleBlockContainsParentRequire(const Block & sample_block, const Names & parent_require)
{
std::unordered_map<String, DataTypePtr> schema_map;
for (const auto & column : schema)
schema_map[column.name] = column.type;
for (const auto & sample_block_column : sample_block)
for (const auto & parent_require_column : parent_require)
{
auto it = schema_map.find(sample_block_column.name);
if (unlikely(it == schema_map.end()))
if (unlikely(!sample_block.has(parent_require_column)))
throw TiFlashException(
fmt::format("schema {} don't contain sample block column: {}", schemaToString(schema), sample_block_column.name),
Errors::Planner::Internal);

const auto & type_in_schema = it->second->getName();
const auto & type_in_sample_block = sample_block_column.type->getName();
if (unlikely(type_in_sample_block != type_in_schema))
throw TiFlashException(
fmt::format(
"the type of column `{}` in schema `{}` is different from the one in sample block `{}`",
sample_block_column.name,
type_in_schema,
type_in_sample_block),
fmt::format("sample block {} don't contain parent_require column: {}", blockMetaToString(sample_block), parent_require_column),
Errors::Planner::Internal);
}
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/FinalizeHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ void checkParentRequireContainsSchema(const Names & parent_require, const NamesA

void checkSampleBlockContainsSchema(const Block & sample_block, const NamesAndTypes & schema);

void checkSchemaContainsSampleBlock(const NamesAndTypes & schema, const Block & sample_block);
void checkSampleBlockContainsParentRequire(const Block & sample_block, const Names & parent_require);
} // namespace DB::FinalizeHelper
54 changes: 31 additions & 23 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include <Flash/Planner/plans/PhysicalMockExchangeSender.h>
#include <Flash/Planner/plans/PhysicalMockTableScan.h>
#include <Flash/Planner/plans/PhysicalProjection.h>
#include <Flash/Planner/plans/PhysicalSource.h>
#include <Flash/Planner/plans/PhysicalTableScan.h>
#include <Flash/Planner/plans/PhysicalTopN.h>
#include <Flash/Planner/plans/PhysicalWindow.h>
Expand All @@ -53,6 +52,16 @@ bool pushDownSelection(const PhysicalPlanNodePtr & plan, const String & executor
}
return false;
}

void fillOrderForListBasedExecutors(DAGContext & dag_context, const PhysicalPlanNodePtr & root_node)
{
auto & list_based_executors_order = dag_context.list_based_executors_order;
PhysicalPlanVisitor::visitPostOrder(root_node, [&](const PhysicalPlanNodePtr & plan) {
assert(plan);
if (plan->isRecordProfileStreams())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But FinalProjection.isRecordProfileStreams() == false, so I think it's better not to add likely

list_based_executors_order.push_back(plan->execId());
});
}
} // namespace

void PhysicalPlan::build(const tipb::DAGRequest * dag_request)
Expand Down Expand Up @@ -93,6 +102,7 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec
break;
case tipb::ExecType::TypeExchangeSender:
{
buildFinalProjection(fmt::format("{}_", executor_id), true);
if (unlikely(dagContext().isTest()))
pushBack(PhysicalMockExchangeSender::build(executor_id, log, popBack()));
else
Expand Down Expand Up @@ -129,27 +139,13 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec
}
case tipb::ExecType::TypeJoin:
{
auto right = popBack();
auto left = popBack();

/// Both sides of the join need to have non-root-final-projection to ensure that
/// there are no duplicate columns in the blocks on the build and probe sides.
buildFinalProjection(fmt::format("{}_r_", executor_id), false);
auto right = popBack();

/// After DAGQueryBlock removed, `dagContext().isTest() && right->tp() != PlanType::Source`
/// and `dagContext().isTest() && right->tp() != PlanType::Source` will be removed.
if (dagContext().isTest() && right->tp() != PlanType::Source)
{
pushBack(right);
buildFinalProjection(fmt::format("{}_r_", executor_id), false);
right = popBack();
}

if (dagContext().isTest() && right->tp() != PlanType::Source)
{
pushBack(left);
buildFinalProjection(fmt::format("{}_l_", executor_id), false);
left = popBack();
}
buildFinalProjection(fmt::format("{}_l_", executor_id), false);
auto left = popBack();

pushBack(PhysicalJoin::build(context, executor_id, log, executor->join(), left, right));
break;
Expand Down Expand Up @@ -199,29 +195,41 @@ PhysicalPlanNodePtr PhysicalPlan::popBack()
return back;
}

void PhysicalPlan::buildSource(const String & executor_id, const BlockInputStreams & source_streams)
/// For MPP, root final projection has been added under PhysicalExchangeSender or PhysicalMockExchangeSender.
/// For batchcop/cop that without PhysicalExchangeSender or PhysicalMockExchangeSender, We need to add root final projection.
void PhysicalPlan::addRootFinalProjectionIfNeed()
{
pushBack(PhysicalSource::build(executor_id, source_streams, log));
assert(root_node);
if (root_node->tp() != PlanType::ExchangeSender && root_node->tp() != PlanType::MockExchangeSender)
{
pushBack(root_node);
buildFinalProjection(fmt::format("{}_", root_node->execId()), true);
root_node = popBack();
}
}

void PhysicalPlan::outputAndOptimize()
{
RUNTIME_ASSERT(!root_node, log, "root_node shoud be nullptr before `outputAndOptimize`");
RUNTIME_ASSERT(cur_plan_nodes.size() == 1, log, "There can only be one plan node output, but here are {}", cur_plan_nodes.size());

root_node = popBack();
addRootFinalProjectionIfNeed();

LOG_FMT_DEBUG(
log,
"build unoptimized physical plan: \n{}",
toString());

root_node = optimize(context, root_node);
root_node = optimize(context, root_node, log);
LOG_FMT_DEBUG(
log,
"build optimized physical plan: \n{}",
toString());

RUNTIME_ASSERT(root_node, log, "root_node shoudn't be nullptr after `outputAndOptimize`");

if (!dagContext().return_executor_id)
fillOrderForListBasedExecutors(dagContext(), root_node);
}

String PhysicalPlan::toString() const
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Flash/Planner/PhysicalPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ class PhysicalPlan

void build(const tipb::DAGRequest * dag_request);

void build(const String & executor_id, const tipb::Executor * executor);

void buildSource(const String & executor_id, const BlockInputStreams & source_streams);

void buildFinalProjection(const String & column_prefix, bool is_root);

// after outputAndOptimize, the physical plan node tree is done.
void outputAndOptimize();

Expand All @@ -48,6 +42,12 @@ class PhysicalPlan
void transform(DAGPipeline & pipeline, Context & context, size_t max_streams);

private:
void addRootFinalProjectionIfNeed();

void build(const String & executor_id, const tipb::Executor * executor);

void buildFinalProjection(const String & column_prefix, bool is_root);

PhysicalPlanNodePtr popBack();

void pushBack(const PhysicalPlanNodePtr & plan);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class PhysicalPlanNode
/// Obtain a sample block that contains the names and types of result columns.
virtual const Block & getSampleBlock() const = 0;

bool isRecordProfileStreams() const { return is_record_profile_streams; }

void disableRecordProfileStreams() { is_record_profile_streams = false; }

void disableRestoreConcurrency() { is_restore_concurrency = false; }
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,17 @@ void visit(const PhysicalPlanNodePtr & plan, FF && f)
}
}

/// visit physical plan node tree in reverse order and apply function.
/// f: (const PhysicalPlanNodePtr &).
template <typename FF>
void visitPostOrder(const PhysicalPlanNodePtr & plan, FF && f)
{
for (size_t i = 0; i < plan->childrenSize(); ++i)
{
visitPostOrder(plan->children(i), std::forward<FF>(f));
}
f(plan);
}

String visitToString(const PhysicalPlanNodePtr & plan);
} // namespace DB::PhysicalPlanVisitor
Loading