From c14a5e9aae1b194f365cafd4de8b15e5f805a39c Mon Sep 17 00:00:00 2001 From: bestwoody Date: Fri, 25 Mar 2022 14:06:51 +0800 Subject: [PATCH 1/4] close receiver early Signed-off-by: bestwoody --- dbms/src/DataStreams/TiRemoteBlockInputStream.h | 6 ++++++ dbms/src/Flash/Coprocessor/DAGContext.cpp | 8 ++++++++ dbms/src/Flash/Coprocessor/DAGContext.h | 2 ++ dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 10 ++++++++-- dbms/src/Flash/Mpp/ExchangeReceiver.h | 4 ++++ dbms/src/Flash/Mpp/MPPTask.cpp | 2 ++ 6 files changed, 30 insertions(+), 2 deletions(-) diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index adef565ea71..444b0b08acd 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -215,6 +215,12 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream return block; } + void readSuffix() override + { + IProfilingBlockInputStream::readSuffix(); + remote_reader->close(); + } + const std::unordered_map * getRemoteExecutionSummaries(size_t index) { return execution_summaries_inited[index].load() ? &execution_summaries[index] : nullptr; diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index cd814e6fa18..a38eeef3145 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -221,6 +221,14 @@ const std::unordered_map> & DAGContext return mpp_exchange_receiver_map; } +void DAGContext::cancelAllExchangeReceiver() +{ + for (auto & it : mpp_exchange_receiver_map) + { + it.second->cancel(); + } +} + int DAGContext::getNewThreadCountOfExchangeReceiver() const { return new_thread_count_of_exchange_receiver; diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 601c7af2d3d..b1c92a9035e 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -266,6 +266,8 @@ class DAGContext return sql_mode & f; } + void cancelAllExchangeReceiver(); + void initExchangeReceiverIfMPP(Context & context, size_t max_streams); const std::unordered_map> & getMPPExchangeReceiverMap() const; diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 802315b9676..a0ff315a6d1 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -311,8 +311,7 @@ ExchangeReceiverBase::ExchangeReceiverBase( template ExchangeReceiverBase::~ExchangeReceiverBase() { - setState(ExchangeReceiverState::CLOSED); - msg_channel.finish(); + close(); thread_manager->wait(); } @@ -323,6 +322,13 @@ void ExchangeReceiverBase::cancel() msg_channel.finish(); } +template +void ExchangeReceiverBase::close() +{ + setState(ExchangeReceiverState::CLOSED); + msg_channel.finish(); +} + template void ExchangeReceiverBase::setUpConnection() { diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 7196eb17c71..32defa2eaec 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -96,8 +96,12 @@ class ExchangeReceiverBase ~ExchangeReceiverBase(); + // idempotent cancel operation void cancel(); + // idempotent close operation + void close(); + const DAGSchema & getOutputSchema() const { return schema; } ExchangeReceiverResult nextResult( diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 59357823058..43d6949597b 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -352,6 +352,8 @@ void MPPTask::runImpl() else { context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true); + if (dag_context) + dag_context->cancelAllExchangeReceiver(); writeErrToAllTunnels(err_msg); } LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds()); From 431a8c62c920f20781c8eb35f101743473113452 Mon Sep 17 00:00:00 2001 From: bestwoody Date: Fri, 25 Mar 2022 14:24:29 +0800 Subject: [PATCH 2/4] update Signed-off-by: bestwoody --- dbms/src/Flash/Coprocessor/CoprocessorReader.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index 98a8f399237..8a3eb471e54 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -95,6 +95,7 @@ class CoprocessorReader void cancel() { resp_iter.cancel(); } + static DecodeDetail decodeChunks( const std::shared_ptr & resp, std::queue & block_queue, @@ -187,6 +188,8 @@ class CoprocessorReader collected = false; } + void close() {} + bool collected = false; int concurrency_; }; From 382b3b4201f96490c8caaad7f25e88914029f245 Mon Sep 17 00:00:00 2001 From: bestwoody Date: Fri, 25 Mar 2022 14:26:15 +0800 Subject: [PATCH 3/4] update Signed-off-by: bestwoody --- dbms/src/DataStreams/TiRemoteBlockInputStream.h | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index be60481fe48..3e38d629d47 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -215,12 +215,6 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream return block; } - void readSuffix() override - { - IProfilingBlockInputStream::readSuffix(); - remote_reader->close(); - } - const std::unordered_map * getRemoteExecutionSummaries(size_t index) { return execution_summaries_inited[index].load() ? &execution_summaries[index] : nullptr; @@ -247,6 +241,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream virtual void readSuffix() override { LOG_FMT_DEBUG(log, "finish read {} rows from remote", total_rows); + remote_reader->close(); } }; From 2335e406d3cd9454ed59c0e79c8da82efa67dd42 Mon Sep 17 00:00:00 2001 From: bestwoody Date: Fri, 25 Mar 2022 16:43:03 +0800 Subject: [PATCH 4/4] add state check of exchgRecv Signed-off-by: bestwoody --- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 10 ++++++++++ dbms/src/Flash/Mpp/ExchangeReceiver.h | 2 -- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index be61be17926..43a15d87bcf 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -318,6 +318,11 @@ ExchangeReceiverBase::~ExchangeReceiverBase() template void ExchangeReceiverBase::cancel() { + auto cur_state = getState(); + if (cur_state == ExchangeReceiverState::CANCELED || cur_state == ExchangeReceiverState::CLOSED) + { + return; + } setState(ExchangeReceiverState::CANCELED); msg_channel.finish(); } @@ -325,6 +330,11 @@ void ExchangeReceiverBase::cancel() template void ExchangeReceiverBase::close() { + auto cur_state = getState(); + if (cur_state == ExchangeReceiverState::CANCELED || cur_state == ExchangeReceiverState::CLOSED) + { + return; + } setState(ExchangeReceiverState::CLOSED); msg_channel.finish(); } diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 32defa2eaec..c5453b98fbe 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -96,10 +96,8 @@ class ExchangeReceiverBase ~ExchangeReceiverBase(); - // idempotent cancel operation void cancel(); - // idempotent close operation void close(); const DAGSchema & getOutputSchema() const { return schema; }