From a476307afa27c5e966bc6dabb7d63f4ea93b64c1 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 26 Jul 2022 12:49:10 +0800 Subject: [PATCH] refine `InterpreterDAG` (#5466) ref pingcap/tiflash#4739 --- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 2 +- dbms/src/Flash/Coprocessor/InterpreterDAG.cpp | 19 +------------ .../Flash/Coprocessor/InterpreterUtils.cpp | 27 +++++++++++++++++++ dbms/src/Flash/Coprocessor/InterpreterUtils.h | 6 +++++ 4 files changed, 35 insertions(+), 19 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 2dce59c249c..7fa32c316a1 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -647,7 +647,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) "execution stream size for query block(before aggregation) {} is {}", query_block.qb_column_prefix, pipeline.streams.size()); - dagContext().final_concurrency = std::min(std::max(dagContext().final_concurrency, pipeline.streams.size()), max_streams); + dagContext().updateFinalConcurrency(pipeline.streams.size(), max_streams); if (res.before_aggregation) { diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index 0e767d65d77..61249f19642 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include #include @@ -67,23 +66,7 @@ BlockIO InterpreterDAG::execute() BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock()); DAGPipeline pipeline; pipeline.streams = streams; - /// add union to run in parallel if needed - if (unlikely(dagContext().isTest())) - executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for test"); - else if (dagContext().isMPPTask()) - /// MPPTask do not need the returned blocks. - executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true, "for mpp"); - else - executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for non mpp"); - if (dagContext().hasSubquery()) - { - const Settings & settings = context.getSettingsRef(); - pipeline.firstStream() = std::make_shared( - pipeline.firstStream(), - std::move(dagContext().moveSubqueries()), - SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode), - dagContext().log->identifier()); - } + executeCreatingSets(pipeline, context, max_streams, dagContext().log); BlockIO res; res.in = pipeline.firstStream(); return res; diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 002a06d07b9..034643a6514 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -165,4 +166,30 @@ void orderStreams( log->identifier()); } } + +void executeCreatingSets( + DAGPipeline & pipeline, + const Context & context, + size_t max_streams, + const LoggerPtr & log) +{ + DAGContext & dag_context = *context.getDAGContext(); + /// add union to run in parallel if needed + if (unlikely(dag_context.isTest())) + executeUnion(pipeline, max_streams, log, /*ignore_block=*/false, "for test"); + else if (dag_context.isMPPTask()) + /// MPPTask do not need the returned blocks. + executeUnion(pipeline, max_streams, log, /*ignore_block=*/true, "for mpp"); + else + executeUnion(pipeline, max_streams, log, /*ignore_block=*/false, "for non mpp"); + if (dag_context.hasSubquery()) + { + const Settings & settings = context.getSettingsRef(); + pipeline.firstStream() = std::make_shared( + pipeline.firstStream(), + std::move(dag_context.moveSubqueries()), + SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode), + log->identifier()); + } +} } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index bd64346718c..e2ebcbc5395 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -60,4 +60,10 @@ void orderStreams( bool enable_fine_grained_shuffle, const Context & context, const LoggerPtr & log); + +void executeCreatingSets( + DAGPipeline & pipeline, + const Context & context, + size_t max_streams, + const LoggerPtr & log); } // namespace DB