From f601a461b2e7ce6a782c8d3a285bb73ef3df1ef7 Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 22 Sep 2022 19:15:04 +0800 Subject: [PATCH] This is an automated cherry-pick of #5966 Signed-off-by: ti-chi-bot --- dbms/src/Common/FailPoint.cpp | 3 - dbms/src/Common/MPMCQueue.h | 9 + dbms/src/Flash/EstablishCall.cpp | 15 + dbms/src/Flash/Mpp/GRPCSendQueue.h | 277 ++++++++++++++++++ dbms/src/Flash/Mpp/MPPHandler.cpp | 16 +- dbms/src/Flash/Mpp/MPPTask.cpp | 73 ++--- dbms/src/Flash/Mpp/MPPTask.h | 13 +- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 67 ++++- dbms/src/Flash/Mpp/MPPTaskManager.h | 24 ++ dbms/src/Flash/Mpp/MPPTunnel.cpp | 80 ++++- dbms/src/Flash/Mpp/MPPTunnel.h | 71 ++++- dbms/src/Flash/Mpp/MPPTunnelSet.cpp | 14 +- dbms/src/Flash/Mpp/MPPTunnelSet.h | 3 +- dbms/src/Flash/Mpp/MinTSOScheduler.cpp | 4 + .../Flash/Mpp/tests/gtest_grpc_send_queue.cpp | 187 ++++++++++++ dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 104 ++++--- tests/fullstack-test/mpp/mpp_fail.test | 3 + 17 files changed, 854 insertions(+), 109 deletions(-) create mode 100644 dbms/src/Flash/Mpp/GRPCSendQueue.h create mode 100644 dbms/src/Flash/Mpp/tests/gtest_grpc_send_queue.cpp diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index ad5010d7826..cb1a2a00c86 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -56,8 +56,6 @@ std::unordered_map> FailPointHelper::f M(exception_before_mpp_register_tunnel_for_root_mpp_task) \ M(exception_before_mpp_root_task_run) \ M(exception_during_mpp_root_task_run) \ - M(exception_during_mpp_write_err_to_tunnel) \ - M(exception_during_mpp_close_tunnel) \ M(exception_during_write_to_storage) \ M(force_set_sst_to_dtfile_block_size) \ M(force_set_sst_decode_rand) \ @@ -126,7 +124,6 @@ std::unordered_map> FailPointHelper::f M(random_aggregate_merge_failpoint) \ M(random_sharedquery_failpoint) \ M(random_interpreter_failpoint) \ - M(random_task_lifecycle_failpoint) \ M(random_task_manager_find_task_failure_failpoint) \ M(random_min_tso_scheduler_failpoint) diff --git a/dbms/src/Common/MPMCQueue.h b/dbms/src/Common/MPMCQueue.h index 42aad66e807..357367cffd5 100644 --- a/dbms/src/Common/MPMCQueue.h +++ b/dbms/src/Common/MPMCQueue.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -70,9 +71,13 @@ class MPMCQueue ~MPMCQueue() { +<<<<<<< HEAD std::unique_lock lock(mu); for (; read_pos < write_pos; ++read_pos) destruct(getObj(read_pos)); +======= + drain(); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) } // Cannot to use copy/move constructor, @@ -208,7 +213,11 @@ class MPMCQueue return write_pos - read_pos < capacity || !isNormal(); } +<<<<<<< HEAD MPMCQueueStatus getStatus() const +======= + const String & getCancelReason() const +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) { std::unique_lock lock(mu); return status; diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index 62a10e3afb6..f5218950450 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -161,6 +161,7 @@ void EstablishCallData::proceed() { if (state == NEW_REQUEST) { +<<<<<<< HEAD state = PROCESSING; spawn(service, cq, notify_cq, is_shutdown); @@ -196,6 +197,20 @@ void EstablishCallData::proceed() // Once in the FINISH state, deallocate ourselves (EstablishCallData). // That't the way GRPC official examples do. link: https://github.com/grpc/grpc/blob/master/examples/cpp/helloworld/greeter_async_server.cc delete this; +======= + case GRPCSendQueueRes::OK: + write(res->packet); + return; + case GRPCSendQueueRes::FINISHED: + writeDone("", grpc::Status::OK); + return; + case GRPCSendQueueRes::CANCELLED: + RUNTIME_ASSERT(!async_tunnel_sender->getCancelReason().empty(), "Tunnel sender cancelled without reason"); + writeErr(getPacketWithError(async_tunnel_sender->getCancelReason())); + return; + case GRPCSendQueueRes::EMPTY: + // No new message. +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) return; } } diff --git a/dbms/src/Flash/Mpp/GRPCSendQueue.h b/dbms/src/Flash/Mpp/GRPCSendQueue.h new file mode 100644 index 00000000000..4ed63f9531d --- /dev/null +++ b/dbms/src/Flash/Mpp/GRPCSendQueue.h @@ -0,0 +1,277 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace tests +{ +class TestGRPCSendQueue; +} // namespace tests + +/// In grpc cpp framework, the tag that is pushed into grpc completion +/// queue must be inherited from `CompletionQueueTag`. +class KickTag : public grpc::internal::CompletionQueueTag +{ +public: + explicit KickTag(std::function a) + : action(std::move(a)) + {} + + bool FinalizeResult(void ** tag_, bool * /*status*/) override + { + *tag_ = action(); + return true; + } + +private: + /// `action` is called before the `tag` is popped from completion queue + /// in `FinalizeResult`. + std::function action; +}; + +using GRPCKickFunc = std::function; + +enum class GRPCSendQueueRes +{ + OK, + FINISHED, + EMPTY, + CANCELLED, +}; + +/// A multi-producer-single-consumer queue dedicated to async grpc streaming send work. +/// +/// In streaming rpc, a client/server may send messages continuous. +/// However, async grpc is only allowed to have one outstanding write on the +/// same side of the same stream without waiting for the completion queue. +/// Further more, the message usually is generated from another thread which +/// introduce a race between this thread and grpc threads. +/// The grpc cpp framework provides a tool named `Alarm` can be used to push a tag into +/// completion queue thus the write can be done in grpc threads. But `Alarm` must need +/// a timeout and it uses a timer to trigger the notification, which is wasteful if we want +/// to trigger it immediately. So we can say `kickCompletionQueue` function is a +/// immediately-triggered `Alarm`. +template +class GRPCSendQueue +{ +public: + GRPCSendQueue(size_t queue_size, grpc_call * call, const LoggerPtr & l) + : send_queue(queue_size) + , log(l) + , kick_tag([this]() { return kickTagAction(); }) + { + RUNTIME_ASSERT(call != nullptr, log, "call is null"); + // If a call to `grpc_call_start_batch` with an empty batch returns + // `GRPC_CALL_OK`, the tag is pushed into the completion queue immediately. + // This behavior is well-defined. See https://github.com/grpc/grpc/issues/16357. + kick_func = [call](void * t) { + return grpc_call_start_batch(call, nullptr, 0, t, nullptr); + }; + } + + // For gtest usage. + GRPCSendQueue(size_t queue_size, GRPCKickFunc func) + : send_queue(queue_size) + , log(Logger::get("GRPCSendQueue", "test")) + , kick_func(func) + , kick_tag([this]() { return kickTagAction(); }) + {} + + ~GRPCSendQueue() + { + std::unique_lock lock(mu); + + RUNTIME_ASSERT(status == Status::NONE, log, "status {} is not none", status); + } + + /// Push the data from queue and kick the grpc completion queue. + /// + /// Return true if push succeed. + /// Else return false. + template + bool push(U && u) + { + auto ret = send_queue.push(std::forward(u)) == MPMCQueueResult::OK; + if (ret) + { + kickCompletionQueue(); + } + return ret; + } + + /// Cancel the send queue, and set the cancel reason + bool cancelWith(const String & reason) + { + auto ret = send_queue.cancelWith(reason); + if (ret) + { + kickCompletionQueue(); + } + return ret; + } + + const String & getCancelReason() const + { + return send_queue.getCancelReason(); + } + + /// Pop the data from queue. + /// + /// Return OK if pop is done. + /// Return FINISHED if the queue is finished and empty. + /// Return EMPTY if there is no data in queue and `new_tag` is saved. + /// When the next push/finish is called, the `new_tag` will be pushed + /// into grpc completion queue. + /// Note that any data in `new_tag` mustn't be touched if this function + /// returns EMPTY because this `new_tag` may be popped out in another + /// grpc thread immediately. By the way, if this completion queue is only + /// tied to one grpc thread, this data race will not happen. + GRPCSendQueueRes pop(T & data, void * new_tag) + { + RUNTIME_ASSERT(new_tag != nullptr, log, "new_tag is nullptr"); + + auto res = send_queue.tryPop(data); + switch (res) + { + case MPMCQueueResult::OK: + return GRPCSendQueueRes::OK; + case MPMCQueueResult::FINISHED: + return GRPCSendQueueRes::FINISHED; + case MPMCQueueResult::CANCELLED: + return GRPCSendQueueRes::CANCELLED; + case MPMCQueueResult::EMPTY: + // Handle this case later. + break; + default: + RUNTIME_ASSERT(false, log, "Result {} is invalid", res); + } + + std::unique_lock lock(mu); + + RUNTIME_ASSERT(status == Status::NONE, log, "status {} is not none", status); + + // Double check if this queue is empty. + res = send_queue.tryPop(data); + switch (res) + { + case MPMCQueueResult::OK: + return GRPCSendQueueRes::OK; + case MPMCQueueResult::FINISHED: + return GRPCSendQueueRes::FINISHED; + case MPMCQueueResult::CANCELLED: + return GRPCSendQueueRes::CANCELLED; + case MPMCQueueResult::EMPTY: + { + // If empty, change status to WAITING. + status = Status::WAITING; + tag = new_tag; + return GRPCSendQueueRes::EMPTY; + } + default: + RUNTIME_ASSERT(false, log, "Result {} is invalid", res); + } + } + + /// Finish the queue and kick the grpc completion queue. + /// + /// For return value meanings, see `MPMCQueue::finish`. + bool finish() + { + auto ret = send_queue.finish(); + if (ret) + { + kickCompletionQueue(); + } + return ret; + } + +private: + friend class tests::TestGRPCSendQueue; + + void * kickTagAction() + { + std::unique_lock lock(mu); + + RUNTIME_ASSERT(status == Status::QUEUING, log, "status {} is not queuing", status); + status = Status::NONE; + + return std::exchange(tag, nullptr); + } + + /// Wake up its completion queue. + void kickCompletionQueue() + { + { + std::unique_lock lock(mu); + if (status != Status::WAITING) + { + return; + } + RUNTIME_ASSERT(tag != nullptr, log, "status is waiting but tag is nullptr"); + status = Status::QUEUING; + } + + grpc_call_error error = kick_func(&kick_tag); + // If an error occur, there must be something wrong about shutdown process. + RUNTIME_ASSERT(error == grpc_call_error::GRPC_CALL_OK, log, "grpc_call_start_batch returns {} != GRPC_CALL_OK, memory of tag may leak", error); + } + + MPMCQueue send_queue; + + const LoggerPtr log; + + /// The mutex is used to synchronize the concurrent calls between push/finish and pop. + /// It protects `status` and `tag`. + /// The concurrency problem we want to prevent here is LOST NOTIFICATION which is + /// similar to `condition_variable`. + /// Note that this mutex is necessary. It's useless to just change the `tag` to atomic. + /// + /// Imagine this case: + /// Thread 1: want to pop the data from queue but find no data there. + /// Thread 2: push/finish the data in queue. + /// Thread 2: do not kick the completion queue because tag is nullptr. + /// Thread 1: set the tag. + /// + /// If there is no more data, this connection will get stuck forever. + std::mutex mu; + + enum class Status + { + /// No tag. + NONE, + /// Waiting for kicking. + WAITING, + /// Queuing in the grpc completion queue. + QUEUING, + }; + + Status status = Status::NONE; + void * tag = nullptr; + + GRPCKickFunc kick_func; + + KickTag kick_tag; +}; + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Mpp/MPPHandler.cpp b/dbms/src/Flash/Mpp/MPPHandler.cpp index 7f97a1dd698..c724a2c0c9c 100644 --- a/dbms/src/Flash/Mpp/MPPHandler.cpp +++ b/dbms/src/Flash/Mpp/MPPHandler.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include @@ -28,14 +27,17 @@ extern const char exception_before_mpp_root_task_run[]; void MPPHandler::handleError(const MPPTaskPtr & task, String error) { - try + if (task) { - if (task) + try + { task->handleError(error); - } - catch (...) - { - tryLogCurrentException(log, "Fail to handle error and clean task"); + } + catch (...) + { + tryLogCurrentException(log, "Fail to handle error and clean task"); + } + task->unregisterTask(); } } // execute is responsible for making plan , register tasks and tunnels and start the running thread. diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 9d7b99ed0a9..ff17166852f 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -21,13 +21,10 @@ #include #include #include -#include #include #include #include -#include #include -#include #include #include #include @@ -49,15 +46,14 @@ extern const char exception_before_mpp_register_root_mpp_task[]; extern const char exception_before_mpp_register_tunnel_for_non_root_mpp_task[]; extern const char exception_before_mpp_register_tunnel_for_root_mpp_task[]; extern const char exception_during_mpp_register_tunnel_for_non_root_mpp_task[]; -extern const char exception_during_mpp_write_err_to_tunnel[]; extern const char force_no_local_region_for_mpp_task[]; -extern const char random_task_lifecycle_failpoint[]; } // namespace FailPoints MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_) : context(context_) , meta(meta_) , id(meta.start_ts(), meta.task_id()) + , manager(context_->getTMTContext().getMPPTaskManager().get()) , log(Logger::get("MPPTask", id.toString())) , mpp_task_statistics(id, meta.address()) , needed_threads(0) @@ -70,28 +66,25 @@ MPPTask::~MPPTask() /// to current_memory_tracker in the destructor if (current_memory_tracker != memory_tracker) current_memory_tracker = memory_tracker; - closeAllTunnels(""); + abortTunnels("", true); if (schedule_state == ScheduleState::SCHEDULED) { /// the threads of this task are not fully freed now, since the BlockIO and DAGContext are not destructed /// TODO: finish all threads before here, except the current one. - manager.load()->releaseThreadsFromScheduler(needed_threads); + manager->releaseThreadsFromScheduler(needed_threads); schedule_state = ScheduleState::COMPLETED; } LOG_FMT_DEBUG(log, "finish MPPTask: {}", id.toString()); } -void MPPTask::abortTunnels(const String & message, AbortType abort_type) +void MPPTask::abortTunnels(const String & message, bool wait_sender_finish) { - if (abort_type == AbortType::ONCANCELLATION) { - closeAllTunnels(message); - } - else - { - RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set"); - tunnel_set->writeError(message); + std::unique_lock lock(tunnel_and_receiver_mu); + if (unlikely(tunnel_set == nullptr)) + return; } + tunnel_set->close(message, wait_sender_finish); } void MPPTask::abortReceivers() @@ -109,12 +102,15 @@ void MPPTask::abortDataStreams(AbortType abort_type) context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, is_kill); } +<<<<<<< HEAD void MPPTask::closeAllTunnels(const String & reason) { if (likely(tunnel_set)) tunnel_set->close(reason); } +======= +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) void MPPTask::finishWrite() { RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set"); @@ -187,12 +183,13 @@ void MPPTask::initExchangeReceivers() std::pair MPPTask::getTunnel(const ::mpp::EstablishMPPConnectionRequest * request) { - if (status == CANCELLED) + if (status == CANCELLED || status == FAILED) { auto err_msg = fmt::format( - "can't find tunnel ({} + {}) because the task is cancelled", + "can't find tunnel ({} + {}) because the task is aborted, error message = {}", request->sender_meta().task_id(), - request->receiver_meta().task_id()); + request->receiver_meta().task_id(), + err_string); return {nullptr, err_msg}; } @@ -212,16 +209,20 @@ std::pair MPPTask::getTunnel(const ::mpp::EstablishMPPConn void MPPTask::unregisterTask() { +<<<<<<< HEAD auto * manager_ptr = manager.load(); if (manager_ptr != nullptr) { LOG_DEBUG(log, "task unregistered"); manager_ptr->unregisterTask(this); } +======= + auto [result, reason] = manager->unregisterTask(id); + if (result) + LOG_FMT_DEBUG(log, "task unregistered"); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) else - { - LOG_ERROR(log, "task manager is unset"); - } + LOG_FMT_WARNING(log, "task failed to unregister, reason: {}", reason); } void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request) @@ -332,6 +333,7 @@ void MPPTask::runImpl() if (!switchStatus(INITIALIZING, RUNNING)) { LOG_WARNING(log, "task not in initializing state, skip running"); + unregisterTask(); return; } Stopwatch stopwatch; @@ -419,27 +421,26 @@ void MPPTask::runImpl() } } } - LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds()); - // unregister flag is only for FailPoint usage, to produce the situation that MPPTask is destructed - // by grpc CancelMPPTask thread; - bool unregister = true; - fiu_do_on(FailPoints::random_task_lifecycle_failpoint, { - if (!err_msg.empty()) - unregister = false; - }); - if (unregister) - unregisterTask(); - mpp_task_statistics.end(status.load(), err_string); mpp_task_statistics.logTracingJson(); + + LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds()); + unregisterTask(); } void MPPTask::handleError(const String & error_msg) { +<<<<<<< HEAD auto * manager_ptr = manager.load(); /// if manager_ptr is not nullptr, it means the task has already been registered, /// MPPTaskManager::cancelMPPQuery will handle it properly if the query is to be cancelled. if (manager_ptr == nullptr || !manager_ptr->isQueryToBeCancelled(id.start_ts)) +======= + auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg); + manager->abortMPPQuery(id.start_ts, updated_msg, AbortType::ONERROR); + if (!registered) + // if the task is not registered, need to cancel it explicitly +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) abort(error_msg, AbortType::ONERROR); } @@ -472,8 +473,12 @@ void MPPTask::abort(const String & message, AbortType abort_type) err_string = message; /// if the task is in initializing state, mpp task can return error to TiDB directly, /// so just close all tunnels here +<<<<<<< HEAD closeAllTunnels(message); unregisterTask(); +======= + abortTunnels("", false); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) LOG_WARNING(log, "Finish abort task from uninitialized"); return; } @@ -483,7 +488,7 @@ void MPPTask::abort(const String & message, AbortType abort_type) /// first, the top components may see an error caused by the abort, which is not /// the original error err_string = message; - abortTunnels(message, abort_type); + abortTunnels(message, false); abortDataStreams(abort_type); abortReceivers(); scheduleThisTask(ScheduleState::FAILED); @@ -507,7 +512,7 @@ bool MPPTask::switchStatus(TaskStatus from, TaskStatus to) void MPPTask::scheduleOrWait() { - if (!manager.load()->tryToScheduleTask(shared_from_this())) + if (!manager->tryToScheduleTask(shared_from_this())) { LOG_FMT_INFO(log, "task waits for schedule"); Stopwatch stopwatch; diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index dfa9f8a2ea8..77ee168c860 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -92,6 +92,7 @@ class MPPTask : public std::enable_shared_from_this void unregisterTask(); +<<<<<<< HEAD /// Similar to `writeErrToAllTunnels`, but it just try to write the error message to tunnel /// without waiting the tunnel to be connected void closeAllTunnels(const String & reason); @@ -102,9 +103,12 @@ class MPPTask : public std::enable_shared_from_this ONCANCELLATION, ONERROR, }; +======= + // abort the mpp task, note this function should be non-blocking, it just set some flags +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) void abort(const String & message, AbortType abort_type); - void abortTunnels(const String & message, AbortType abort_type); + void abortTunnels(const String & message, bool wait_sender_finish); void abortReceivers(); void abortDataStreams(AbortType abort_type); @@ -143,13 +147,18 @@ class MPPTask : public std::enable_shared_from_this int new_thread_count_of_exchange_receiver = 0; - std::atomic manager = nullptr; + MPPTaskManager * manager; + std::atomic registered{false}; const LoggerPtr log; MPPTaskStatistics mpp_task_statistics; friend class MPPTaskManager; +<<<<<<< HEAD +======= + friend class MPPHandler; +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) int needed_threads; diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 21e9bb857f8..31123e307be 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -46,7 +46,11 @@ MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std:: { return false; } +<<<<<<< HEAD else if (query_it->second->to_be_cancelled) +======= + else if (!query_it->second->isInNormalState()) +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) { /// if the query is cancelled, return true to stop waiting timeout. LOG_WARNING(log, fmt::format("Query {} is cancelled, all its tasks are invalid.", id.start_ts)); @@ -70,8 +74,14 @@ MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std:: return it->second; } +<<<<<<< HEAD void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason) { +======= +void MPPTaskManager::abortMPPQuery(UInt64 query_id, const String & reason, AbortType abort_type) +{ + LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id, magic_enum::enum_name(abort_type), reason)); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) MPPQueryTaskSetPtr task_set; { /// cancel task may take a long time, so first @@ -84,16 +94,26 @@ void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason) LOG_WARNING(log, fmt::format("{} does not found in task manager, skip cancel", query_id)); return; } +<<<<<<< HEAD else if (it->second->to_be_cancelled) +======= + else if (!it->second->isInNormalState()) +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) { LOG_WARNING(log, fmt::format("{} already in cancel process, skip cancel", query_id)); return; } +<<<<<<< HEAD it->second->to_be_cancelled = true; +======= + it->second->state = MPPQueryTaskSet::Aborting; + it->second->error_message = reason; +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) task_set = it->second; scheduler->deleteQuery(query_id, *this, true); cv.notify_all(); } +<<<<<<< HEAD LOG_WARNING(log, fmt::format("Begin cancel query: {}", query_id)); FmtBuffer fmt_buf; fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id); @@ -106,14 +126,30 @@ void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason) it = task_set->task_map.erase(it); thread_manager->schedule(false, "CancelMPPTask", [task = std::move(current_task), &reason] { task->cancel(reason); }); } +======= + + FmtBuffer fmt_buf; + fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id); + for (auto & it : task_set->task_map) + fmt_buf.fmtAppend("{} ", it.first.toString()); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) LOG_WARNING(log, fmt_buf.toString()); - thread_manager->wait(); + + for (auto & it : task_set->task_map) + it.second->abort(reason, abort_type); + { std::lock_guard lock(mu); auto it = mpp_query_map.find(query_id); +<<<<<<< HEAD /// just to double check the query still exists if (it != mpp_query_map.end()) mpp_query_map.erase(it); +======= + RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id); + it->second->state = MPPQueryTaskSet::Aborted; + cv.notify_all(); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) } LOG_WARNING(log, "Finish cancel query: " + std::to_string(query_id)); } @@ -122,11 +158,17 @@ bool MPPTaskManager::registerTask(MPPTaskPtr task) { std::unique_lock lock(mu); const auto & it = mpp_query_map.find(task->id.start_ts); +<<<<<<< HEAD if (it != mpp_query_map.end() && it->second->to_be_cancelled) { LOG_WARNING(log, "Do not register task: " + task->id.toString() + " because the query is to be cancelled."); cv.notify_all(); return false; +======= + if (it != mpp_query_map.end() && !it->second->isInNormalState()) + { + return {false, fmt::format("query is being aborted, error message = {}", it->second->error_message)}; +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) } if (it != mpp_query_map.end() && it->second->task_map.find(task->id) != it->second->task_map.end()) { @@ -142,11 +184,12 @@ bool MPPTaskManager::registerTask(MPPTaskPtr task) { mpp_query_map[task->id.start_ts]->task_map.emplace(task->id, task); } - task->manager = this; + task->registered = true; cv.notify_all(); return true; } +<<<<<<< HEAD bool MPPTaskManager::isQueryToBeCancelled(UInt64 query_id) { std::unique_lock lock(mu); @@ -163,18 +206,32 @@ void MPPTaskManager::unregisterTask(MPPTask * task) if (it->second->to_be_cancelled) return; auto task_it = it->second->task_map.find(task->id); +======= +std::pair MPPTaskManager::unregisterTask(const MPPTaskId & id) +{ + std::unique_lock lock(mu); + auto it = mpp_query_map.end(); + cv.wait(lock, [&] { + it = mpp_query_map.find(id.start_ts); + return it == mpp_query_map.end() || it->second->allowUnregisterTask(); + }); + if (it != mpp_query_map.end()) + { + auto task_it = it->second->task_map.find(id); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) if (task_it != it->second->task_map.end()) { it->second->task_map.erase(task_it); if (it->second->task_map.empty()) { /// remove query task map if the task is the last one - scheduler->deleteQuery(task->id.start_ts, *this, false); + scheduler->deleteQuery(id.start_ts, *this, false); mpp_query_map.erase(it); } return; } } +<<<<<<< HEAD LOG_ERROR(log, "The task " + task->id.toString() + " cannot be found and fail to unregister"); } @@ -199,6 +256,10 @@ std::vector MPPTaskManager::getCurrentTasksForQuery(UInt64 query_id) for (const auto & task_it : it->second->task_map) ret.push_back(task_it.second); return ret; +======= + cv.notify_all(); + return {false, "task can not be found, maybe not registered yet"}; +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) } String MPPTaskManager::toString() diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index fea65daba51..86d70a298d9 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -27,15 +27,35 @@ namespace DB { struct MPPQueryTaskSet { +<<<<<<< HEAD /// to_be_cancelled is kind of lock, if to_be_cancelled is set /// to true, then task_map can only be accessed by query cancel /// thread, which means no task can register/un-register for the /// query, here we do not need mutex because all the write/read /// to MPPQueryTaskSet is protected by the mutex in MPPTaskManager bool to_be_cancelled = false; +======= + enum State + { + Normal, + Aborting, + Aborted, + }; + /// task can only be registered state is Normal + State state = Normal; + String error_message; +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) MPPTaskMap task_map; /// only used in scheduler std::queue waiting_tasks; + bool isInNormalState() const + { + return state == Normal; + } + bool allowUnregisterTask() const + { + return state == Normal || state == Aborted; + } }; using MPPQueryTaskSetPtr = std::shared_ptr; @@ -71,9 +91,13 @@ class MPPTaskManager : private boost::noncopyable bool registerTask(MPPTaskPtr task); +<<<<<<< HEAD void unregisterTask(MPPTask * task); bool isQueryToBeCancelled(UInt64 query_id); +======= + std::pair unregisterTask(const MPPTaskId & id); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) bool tryToScheduleTask(const MPPTaskPtr & task); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index e14d80aa5bd..98113c9ec3d 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -24,7 +24,6 @@ namespace DB { namespace FailPoints { -extern const char exception_during_mpp_close_tunnel[]; extern const char random_tunnel_wait_timeout_failpoint[]; } // namespace FailPoints @@ -70,6 +69,7 @@ MPPTunnel::~MPPTunnel() }); try { +<<<<<<< HEAD { std::unique_lock lock(*mu); if (status == TunnelStatus::Finished) @@ -84,6 +84,9 @@ MPPTunnel::~MPPTunnel() } LOG_FMT_TRACE(log, "waiting consumer finish!"); waitForSenderFinish(/*allow_throw=*/false); +======= + close("", true); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) } catch (...) { @@ -92,6 +95,7 @@ MPPTunnel::~MPPTunnel() LOG_FMT_TRACE(log, "destructed tunnel obj!"); } +<<<<<<< HEAD void MPPTunnel::finishSendQueue() { bool flag = send_queue->finish(); @@ -101,8 +105,10 @@ void MPPTunnel::finishSendQueue() } } +======= +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) /// exit abnormally, such as being cancelled. -void MPPTunnel::close(const String & reason) +void MPPTunnel::close(const String & reason, bool wait_sender_finish) { { std::unique_lock lk(*mu); @@ -113,9 +119,11 @@ void MPPTunnel::close(const String & reason) cv_for_status_changed.notify_all(); return; case TunnelStatus::Connected: + case TunnelStatus::WaitingForSenderFinish: { if (!reason.empty()) { +<<<<<<< HEAD try { FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_close_tunnel); @@ -127,26 +135,32 @@ void MPPTunnel::close(const String & reason) { tryLogCurrentException(log, "Failed to close tunnel: " + tunnel_id); } +======= + tunnel_sender->cancelWith(reason); + } + else + { + tunnel_sender->finish(); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) } - finishSendQueue(); break; } - case TunnelStatus::WaitingForSenderFinish: - break; case TunnelStatus::Finished: return; default: RUNTIME_ASSERT(false, log, "Unsupported tunnel status: {}", status); } } - waitForSenderFinish(/*allow_throw=*/false); + if (wait_sender_finish) + waitForSenderFinish(false); } // TODO: consider to hold a buffer -void MPPTunnel::write(const mpp::MPPDataPacket & data, bool close_after_write) +void MPPTunnel::write(const mpp::MPPDataPacket & data) { LOG_FMT_TRACE(log, "ready to write"); { +<<<<<<< HEAD /// Should keep this lock to protect async_tunnel_sender's tryFlushOne method, /// because the GRPC async thread's might release the GRPC writer while flush method might still use the GRPC writer std::unique_lock lk(*mu); @@ -167,9 +181,21 @@ void MPPTunnel::write(const mpp::MPPDataPacket & data, bool close_after_write) } return; } +======= + std::unique_lock lk(mu); + waitUntilConnectedOrFinished(lk); + if (tunnel_sender == nullptr) + throw Exception(fmt::format("write to tunnel which is already closed.")); } - // push failed, wait consumer for the final state - waitForSenderFinish(/*allow_throw=*/true); + + if (tunnel_sender->push(std::make_shared(data, getMemTracker()))) + { + connection_profile_info.bytes += data.ByteSizeLong(); + connection_profile_info.packets += 1; + return; +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) + } + throw Exception(fmt::format("write to tunnel which is already closed,{}", tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : "")); } /// done normally and being called exactly once after writing all packets @@ -177,13 +203,19 @@ void MPPTunnel::writeDone() { LOG_FMT_TRACE(log, "ready to finish, is_local: {}", mode == TunnelSenderMode::LOCAL); { +<<<<<<< HEAD std::unique_lock lk(*mu); if (status == TunnelStatus::Finished) throw Exception(fmt::format("write to tunnel which is already closed,{}", tunnel_sender ? tunnel_sender->getConsumerFinishMsg() : "")); +======= + std::unique_lock lk(mu); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) /// make sure to finish the tunnel after it is connected waitUntilConnectedOrFinished(lk); - finishSendQueue(); + if (tunnel_sender == nullptr) + throw Exception(fmt::format("write to tunnel which is already closed.")); } + tunnel_sender->finish(); waitForSenderFinish(/*allow_throw=*/true); } @@ -325,6 +357,15 @@ void SyncTunnelSender::sendJob() break; } } + /// write the last error packet if needed + if (send_queue.getStatus() == MPMCQueueStatus::CANCELLED) + { + RUNTIME_ASSERT(!send_queue.getCancelReason().empty(), "Tunnel sender cancelled without reason"); + if (!writer->write(getPacketWithError(send_queue.getCancelReason()))) + { + err_msg = "grpc writes failed."; + } + } } catch (...) { @@ -358,6 +399,7 @@ void AsyncTunnelSender::consumerFinishWithLock(const String & msg) void AsyncTunnelSender::tryFlushOne() { +<<<<<<< HEAD // When consumer finished, sending work is done already, just return if (consumer_state.msgHasSet()) return; @@ -373,6 +415,11 @@ void AsyncTunnelSender::sendOne(bool use_lock) String err_msg; bool queue_empty_flag = false; try +======= + TrackedMppDataPacketPtr res; + auto result = send_queue.pop(res); + if (result == MPMCQueueResult::OK) +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) { MPPDataPacketPtr res; queue_empty_flag = !send_queue->pop(res); @@ -394,6 +441,7 @@ void AsyncTunnelSender::sendOne(bool use_lock) LOG_ERROR(log, err_msg); trimStackTrace(err_msg); } +<<<<<<< HEAD if (!err_msg.empty() || queue_empty_flag) { if (!use_lock) @@ -413,6 +461,18 @@ LocalTunnelSender::MPPDataPacketPtr LocalTunnelSender::readForLocal() MPPDataPacketPtr res; if (send_queue->pop(res)) return res; +======= + else if (result == MPMCQueueResult::CANCELLED) + { + RUNTIME_ASSERT(!send_queue.getCancelReason().empty(), "Tunnel sender cancelled without reason"); + if (!cancel_reason_sent) + { + cancel_reason_sent = true; + res = std::make_shared(getPacketWithError(send_queue.getCancelReason()), current_memory_tracker); + return res; + } + } +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) consumerFinish(""); return nullptr; } diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 5243c9aaf36..a3d2f69ae7b 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -72,9 +72,26 @@ class TunnelSender : private boost::noncopyable , tunnel_id(tunnel_id_) { } +<<<<<<< HEAD DataPacketMPMCQueuePtr getSendQueue() { return send_queue; +======= + + virtual bool push(TrackedMppDataPacketPtr && data) + { + return send_queue.push(data) == MPMCQueueResult::OK; + } + + virtual void cancelWith(const String & reason) + { + send_queue.cancelWith(reason); + } + + virtual bool finish() + { + return send_queue.finish(); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) } void consumerFinish(const String & err_msg); String getConsumerFinishMsg() @@ -160,11 +177,39 @@ class AsyncTunnelSender : public TunnelSender : Base(mode_, send_queue_, writer_, log_, tunnel_id_) , mu(mu_) {} +<<<<<<< HEAD void tryFlushOne(); /// use_lock should be true if it's invoked from async GRPC thread void sendOne(bool use_lock = false); bool isSendQueueNextPopNonBlocking() { return send_queue->isNextPopNonBlocking(); } void consumerFinishWithLock(const String & err_msg); +======= + + bool push(TrackedMppDataPacketPtr && data) override + { + return queue.push(data); + } + + bool finish() override + { + return queue.finish(); + } + + void cancelWith(const String & reason) override + { + queue.cancelWith(reason); + } + + const String & getCancelReason() const + { + return queue.getCancelReason(); + } + + GRPCSendQueueRes pop(TrackedMppDataPacketPtr & data, void * new_tag) + { + return queue.pop(data, new_tag); + } +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) private: std::shared_ptr mu; @@ -177,7 +222,14 @@ class LocalTunnelSender : public TunnelSender public: using Base = TunnelSender; using Base::Base; +<<<<<<< HEAD MPPDataPacketPtr readForLocal(); +======= + TrackedMppDataPacketPtr readForLocal(); + +private: + bool cancel_reason_sent = false; +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) }; using TunnelSenderPtr = std::shared_ptr; @@ -236,15 +288,17 @@ class MPPTunnel : private boost::noncopyable const String & id() const { return tunnel_id; } - // write a single packet to the tunnel, it will block if tunnel is not ready. - void write(const mpp::MPPDataPacket & data, bool close_after_write = false); + // write a single packet to the tunnel's send queue, it will block if tunnel is not ready. + void write(const mpp::MPPDataPacket & data); - // finish the writing. + // finish the writing, and wait until the sender finishes. void writeDone(); - /// close() finishes the tunnel, if the tunnel is connected already, it will - /// write the error message to the tunnel, otherwise it just close the tunnel - void close(const String & reason); + /// close() cancel the tunnel's send queue with `reason`, if reason is not empty, the tunnel sender will + /// write this reason as an error message to its receiver. If `wait_sender_finish` is true, close() will + /// not return until tunnel sender finishes, otherwise, close() will return just after the send queue is + /// cancelled(which is a non-blocking operation) + void close(const String & reason, bool wait_sender_finish); // a MPPConn request has arrived. it will build connection by this tunnel; void connect(PacketWriter * writer); @@ -271,12 +325,15 @@ class MPPTunnel : private boost::noncopyable { Unconnected, // Not connect to any writer, not able to accept new data Connected, // Connected to some writer, accepting data - WaitingForSenderFinish, // Accepting all data already, wait for sender to finish + WaitingForSenderFinish, // Wait for sender to finish Finished // Final state, no more work to do }; StringRef statusToString(); +<<<<<<< HEAD void finishSendQueue(); +======= +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) void waitUntilConnectedOrFinished(std::unique_lock & lk); diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp index 8d709bb7d38..d020ba3e12b 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp @@ -20,12 +20,9 @@ namespace DB { -namespace FailPoints -{ -extern const char exception_during_mpp_write_err_to_tunnel[]; -} // namespace FailPoints namespace { +<<<<<<< HEAD inline mpp::MPPDataPacket serializeToPacket(const tipb::SelectResponse & response) { mpp::MPPDataPacket packet; @@ -34,6 +31,8 @@ inline mpp::MPPDataPacket serializeToPacket(const tipb::SelectResponse & respons return packet; } +======= +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) void checkPacketSize(size_t size) { static constexpr size_t max_packet_size = 1u << 31; @@ -115,6 +114,7 @@ void MPPTunnelSetBase::write(mpp::MPPDataPacket & packet, int16_t partit } template +<<<<<<< HEAD void MPPTunnelSetBase::writeError(const String & msg) { for (auto & tunnel : tunnels) @@ -133,6 +133,8 @@ void MPPTunnelSetBase::writeError(const String & msg) } template +======= +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) void MPPTunnelSetBase::registerTunnel(const MPPTaskId & receiver_task_id, const TunnelPtr & tunnel) { if (receiver_task_id_to_index_map.find(receiver_task_id) != receiver_task_id_to_index_map.end()) @@ -147,10 +149,10 @@ void MPPTunnelSetBase::registerTunnel(const MPPTaskId & receiver_task_id } template -void MPPTunnelSetBase::close(const String & reason) +void MPPTunnelSetBase::close(const String & reason, bool wait_sender_finish) { for (auto & tunnel : tunnels) - tunnel->close(reason); + tunnel->close(reason, wait_sender_finish); } template diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.h b/dbms/src/Flash/Mpp/MPPTunnelSet.h index e4123db1be5..a69272575f3 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.h @@ -54,8 +54,7 @@ class MPPTunnelSetBase : private boost::noncopyable // this is a partition writing. void write(tipb::SelectResponse & response, int16_t partition_id); void write(mpp::MPPDataPacket & packet, int16_t partition_id); - void writeError(const String & msg); - void close(const String & reason); + void close(const String & reason, bool wait_sender_finish); void finishWrite(); void registerTunnel(const MPPTaskId & id, const TunnelPtr & tunnel); diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index be2f3fd6e97..233ea7be126 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -73,7 +73,11 @@ bool MinTSOScheduler::tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & ta } const auto & id = task->getId(); auto query_task_set = task_manager.getQueryTaskSetWithoutLock(id.start_ts); +<<<<<<< HEAD if (nullptr == query_task_set || query_task_set->to_be_cancelled) +======= + if (nullptr == query_task_set || !query_task_set->isInNormalState()) +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) { LOG_FMT_WARNING(log, "{} is scheduled with miss or cancellation.", id.toString()); return true; diff --git a/dbms/src/Flash/Mpp/tests/gtest_grpc_send_queue.cpp b/dbms/src/Flash/Mpp/tests/gtest_grpc_send_queue.cpp new file mode 100644 index 00000000000..73a63136ffc --- /dev/null +++ b/dbms/src/Flash/Mpp/tests/gtest_grpc_send_queue.cpp @@ -0,0 +1,187 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include +#include + +namespace DB +{ +namespace tests +{ +class TestGRPCSendQueue : public testing::Test +{ +protected: + TestGRPCSendQueue() + : tag(nullptr) + , queue(10, [this](KickTag * t) -> grpc_call_error { + bool no_use; + t->FinalizeResult(&tag, &no_use); + return grpc_call_error::GRPC_CALL_OK; + }) + {} + void * tag; + GRPCSendQueue queue; + +public: + void checkTag(void * t) + { + GTEST_ASSERT_EQ(t, tag); + tag = nullptr; + } + + void checkTagInQueue(void * t) + { + GTEST_ASSERT_EQ(t, queue.tag); + } +}; + +TEST_F(TestGRPCSendQueue, Sequential) +try +{ + int p1, p2, p3; + int data; + GTEST_ASSERT_EQ(queue.push(1), true); + checkTagInQueue(nullptr); + checkTag(nullptr); + GTEST_ASSERT_EQ(queue.push(2), true); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.pop(data, &p1), GRPCSendQueueRes::OK); + GTEST_ASSERT_EQ(data, 1); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.pop(data, &p1), GRPCSendQueueRes::OK); + GTEST_ASSERT_EQ(data, 2); + checkTagInQueue(nullptr); + checkTag(nullptr); + + // `queue` is empty, `tag` should be saved. + GTEST_ASSERT_EQ(queue.pop(data, &p2), GRPCSendQueueRes::EMPTY); + checkTagInQueue(&p2); + checkTag(nullptr); + + // `tag` should be gotten. + GTEST_ASSERT_EQ(queue.push(3), true); + checkTagInQueue(nullptr); + checkTag(&p2); + + GTEST_ASSERT_EQ(queue.pop(data, &p3), GRPCSendQueueRes::OK); + GTEST_ASSERT_EQ(data, 3); + checkTagInQueue(nullptr); + checkTag(nullptr); + + // `queue` is empty, `tag` should be saved. + GTEST_ASSERT_EQ(queue.pop(data, &p3), GRPCSendQueueRes::EMPTY); + checkTagInQueue(&p3); + checkTag(nullptr); + + // `tag` should be gotten. + GTEST_ASSERT_EQ(queue.finish(), true); + checkTagInQueue(nullptr); + checkTag(&p3); + + // Next finish should fail. + GTEST_ASSERT_EQ(queue.finish(), false); + checkTagInQueue(nullptr); + checkTag(nullptr); + + // `queue` is finished and empty. + GTEST_ASSERT_EQ(queue.pop(data, &p3), GRPCSendQueueRes::FINISHED); + checkTagInQueue(nullptr); + checkTag(nullptr); +} +CATCH + +TEST_F(TestGRPCSendQueue, SequentialPopAfterFinish) +try +{ + int p1; + int data; + // `queue` is empty, `tag` should be saved. + GTEST_ASSERT_EQ(queue.pop(data, &p1), GRPCSendQueueRes::EMPTY); + checkTagInQueue(&p1); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.push(1), true); + checkTagInQueue(nullptr); + checkTag(&p1); + + GTEST_ASSERT_EQ(queue.push(2), true); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.pop(data, &p1), GRPCSendQueueRes::OK); + GTEST_ASSERT_EQ(data, 1); + checkTagInQueue(nullptr); + checkTag(nullptr); + + // Finish the `queue` while some messages still exist in `queue`. + GTEST_ASSERT_EQ(queue.finish(), true); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.pop(data, &p1), GRPCSendQueueRes::OK); + GTEST_ASSERT_EQ(data, 2); + checkTagInQueue(nullptr); + checkTag(nullptr); + + // `queue` is finished and empty. + GTEST_ASSERT_EQ(queue.pop(data, &p1), GRPCSendQueueRes::FINISHED); + checkTagInQueue(nullptr); + checkTag(nullptr); +} +CATCH + +TEST_F(TestGRPCSendQueue, SequentialPopAfterCancel) +try +{ + int p1; + int data; + + GTEST_ASSERT_EQ(queue.push(1), true); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.push(2), true); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.push(3), true); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.pop(data, &p1), GRPCSendQueueRes::OK); + GTEST_ASSERT_EQ(data, 1); + checkTagInQueue(nullptr); + checkTag(nullptr); + + // Cancel the queue + GTEST_ASSERT_EQ(queue.cancelWith("cancel test"), true); + + GTEST_ASSERT_EQ(queue.pop(data, &p1), GRPCSendQueueRes::CANCELLED); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.getCancelReason(), "cancel test"); +} +CATCH + +} // namespace tests +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index ae720badb68..4e2f2de5db4 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -30,7 +30,7 @@ class MockWriter : public PacketWriter { bool write(const mpp::MPPDataPacket & packet) override { - write_packet_vec.push_back(packet.data()); + write_packet_vec.push_back(packet.data().empty() ? packet.error().msg() : packet.data()); return true; } @@ -81,7 +81,11 @@ struct MockLocalReader bool success = tmp_packet != nullptr; if (success) { +<<<<<<< HEAD write_packet_vec.push_back(tmp_packet->data()); +======= + write_packet_vec.push_back(tmp_packet->packet.data().empty() ? tmp_packet->packet.error().msg() : tmp_packet->packet.data()); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) } else { @@ -177,7 +181,42 @@ class MockFailedAsyncWriter : public PacketWriter { if (ready && async_sender->isSendQueueNextPopNonBlocking()) { +<<<<<<< HEAD async_sender->sendOne(); +======= + TrackedMppDataPacketPtr res; + switch (async_tunnel_sender->pop(res, this)) + { + case GRPCSendQueueRes::OK: + if (write_failed) + { + async_tunnel_sender->consumerFinish(fmt::format("{} meet error: grpc writes failed.", async_tunnel_sender->getTunnelId())); + return; + } + write_packet_vec.push_back(res->packet.data()); + break; + case GRPCSendQueueRes::FINISHED: + async_tunnel_sender->consumerFinish(""); + return; + case GRPCSendQueueRes::CANCELLED: + assert(!async_tunnel_sender->getCancelReason().empty()); + if (write_failed) + { + async_tunnel_sender->consumerFinish(fmt::format("{} meet error: {}.", async_tunnel_sender->getTunnelId(), async_tunnel_sender->getCancelReason())); + return; + } + write_packet_vec.push_back(async_tunnel_sender->getCancelReason()); + async_tunnel_sender->consumerFinish(""); + return; + case GRPCSendQueueRes::EMPTY: + std::unique_lock lock(mu); + cv.wait(lock, [&] { + return has_msg; + }); + has_msg = false; + break; + } +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) } ready = true; } @@ -284,7 +323,7 @@ TEST_F(TestMPPTunnel, CloseBeforeConnect) try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), false); } @@ -294,9 +333,9 @@ TEST_F(TestMPPTunnel, CloseAfterClose) try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); } CATCH @@ -314,7 +353,7 @@ TEST_F(TestMPPTunnel, WriteAfterUnconnectFinished) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,"); + GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed."); } } @@ -329,7 +368,7 @@ TEST_F(TestMPPTunnel, WriteDoneAfterUnconnectFinished) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,"); + GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed."); } } @@ -343,27 +382,13 @@ try std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); - mpp_tunnel_ptr->close("Cancel"); - GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); - GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec.size(), 2); //Second for err msg - GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec[0], "First"); -} -CATCH - -TEST_F(TestMPPTunnel, ConnectWriteWithCloseFlag) -try -{ - auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); - std::unique_ptr writer_ptr = std::make_unique(); - mpp_tunnel_ptr->connect(writer_ptr.get()); - GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); - std::unique_ptr data_packet_ptr = std::make_unique(); - data_packet_ptr->set_data("First"); - mpp_tunnel_ptr->write(*data_packet_ptr, true); - mpp_tunnel_ptr->waitForFinish(); + mpp_tunnel_ptr->close("Cancel", true); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); - GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec.size(), 1); - GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec[0], "First"); + auto result_size = dynamic_cast(writer_ptr.get())->write_packet_vec.size(); + // close will cancel the MPMCQueue, so there is no guarantee that all the message will be consumed, only the last error packet + // must to be consumed + GTEST_ASSERT_EQ(result_size >= 1 && result_size <= 2, true); + GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec[result_size - 1], "Cancel"); } CATCH @@ -431,7 +456,7 @@ TEST_F(TestMPPTunnel, WriteAfterFinished) std::unique_ptr writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(writer_ptr.get()); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); auto data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); @@ -478,7 +503,7 @@ TEST_F(TestMPPTunnel, LocalCloseBeforeConnect) try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), false); } @@ -488,9 +513,9 @@ TEST_F(TestMPPTunnel, LocalCloseAfterClose) try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); } CATCH @@ -505,11 +530,12 @@ try std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); - mpp_tunnel_ptr->close("Cancel"); + mpp_tunnel_ptr->close("Cancel", false); local_reader_ptr->thread_manager->wait(); // Join local read thread GTEST_ASSERT_EQ(getTunnelSenderConsumerFinishedFlag(mpp_tunnel_ptr->getTunnelSender()), true); - GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec.size(), 2); //Second for err msg - GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[0], "First"); + auto result_size = local_reader_ptr->write_packet_vec.size(); + GTEST_ASSERT_EQ(result_size == 1 || result_size == 2, true); //Second for err msg + GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[result_size - 1], "Cancel"); } CATCH @@ -577,7 +603,7 @@ TEST_F(TestMPPTunnel, LocalWriteAfterFinished) auto mpp_tunnel_ptr = constructLocalSyncTunnel(); auto local_reader_ptr = connectLocalSyncTunnel(mpp_tunnel_ptr); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); - mpp_tunnel_ptr->close(""); + mpp_tunnel_ptr->close("", false); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); @@ -605,11 +631,19 @@ try mpp_tunnel_ptr->write(*data_packet_ptr); data_packet_ptr->set_data("Second"); mpp_tunnel_ptr->write(*data_packet_ptr); - mpp_tunnel_ptr->close("Cancel"); + mpp_tunnel_ptr->close("Cancel", true); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); +<<<<<<< HEAD GTEST_ASSERT_EQ(dynamic_cast(async_writer_ptr.get())->write_packet_vec.size(), 3); //Third for err msg GTEST_ASSERT_EQ(dynamic_cast(async_writer_ptr.get())->write_packet_vec[0], "First"); GTEST_ASSERT_EQ(dynamic_cast(async_writer_ptr.get())->write_packet_vec[1], "Second"); +======= + + t.join(); + auto result_size = call_data->write_packet_vec.size(); + GTEST_ASSERT_EQ(result_size >= 1 && result_size <= 3, true); //Third for err msg + GTEST_ASSERT_EQ(call_data->write_packet_vec[result_size - 1], "Cancel"); +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) } CATCH diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index 0e272c0b621..2f3bd8caafc 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -80,6 +80,7 @@ mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_m ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_during_mpp_root_task_run is triggered., e.what() = DB::Exception, => DBGInvoke __disable_fail_point(exception_during_mpp_root_task_run) +<<<<<<< HEAD ## exception during mpp write err to tunnel => DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __enable_fail_point(exception_during_mpp_write_err_to_tunnel) @@ -96,6 +97,8 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel) +======= +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) ## exception during mpp hash build ## desc format='brief' select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; ## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+