Skip to content

Commit

Permalink
refine InterpreterDAG (#5466)
Browse files Browse the repository at this point in the history
ref #4739
  • Loading branch information
SeaRise authored Jul 26, 2022
1 parent 679eda6 commit a476307
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 19 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
"execution stream size for query block(before aggregation) {} is {}",
query_block.qb_column_prefix,
pipeline.streams.size());
dagContext().final_concurrency = std::min(std::max(dagContext().final_concurrency, pipeline.streams.size()), max_streams);
dagContext().updateFinalConcurrency(pipeline.streams.size(), max_streams);

if (res.before_aggregation)
{
Expand Down
19 changes: 1 addition & 18 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
Expand Down Expand Up @@ -67,23 +66,7 @@ BlockIO InterpreterDAG::execute()
BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock());
DAGPipeline pipeline;
pipeline.streams = streams;
/// add union to run in parallel if needed
if (unlikely(dagContext().isTest()))
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for test");
else if (dagContext().isMPPTask())
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true, "for mpp");
else
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for non mpp");
if (dagContext().hasSubquery())
{
const Settings & settings = context.getSettingsRef();
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(),
std::move(dagContext().moveSubqueries()),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
dagContext().log->identifier());
}
executeCreatingSets(pipeline, context, max_streams, dagContext().log);
BlockIO res;
res.in = pipeline.firstStream();
return res;
Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
Expand Down Expand Up @@ -165,4 +166,30 @@ void orderStreams(
log->identifier());
}
}

void executeCreatingSets(
DAGPipeline & pipeline,
const Context & context,
size_t max_streams,
const LoggerPtr & log)
{
DAGContext & dag_context = *context.getDAGContext();
/// add union to run in parallel if needed
if (unlikely(dag_context.isTest()))
executeUnion(pipeline, max_streams, log, /*ignore_block=*/false, "for test");
else if (dag_context.isMPPTask())
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, log, /*ignore_block=*/true, "for mpp");
else
executeUnion(pipeline, max_streams, log, /*ignore_block=*/false, "for non mpp");
if (dag_context.hasSubquery())
{
const Settings & settings = context.getSettingsRef();
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(),
std::move(dag_context.moveSubqueries()),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
log->identifier());
}
}
} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ void orderStreams(
bool enable_fine_grained_shuffle,
const Context & context,
const LoggerPtr & log);

void executeCreatingSets(
DAGPipeline & pipeline,
const Context & context,
size_t max_streams,
const LoggerPtr & log);
} // namespace DB

0 comments on commit a476307

Please sign in to comment.