diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 6db39e61586..b38e7d6a2ea 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -84,18 +84,25 @@ MPPTunnelBase::~MPPTunnelBase() { std::unique_lock lock(mu); if (finished) + { + LOG_FMT_TRACE(log, "already finished!"); return; + } + /// make sure to finish the tunnel after it is connected waitUntilConnectedOrFinished(lock); finishSendQueue(); } + LOG_FMT_TRACE(log, "waiting consumer finish!"); waitForConsumerFinish(/*allow_throw=*/false); } catch (...) { tryLogCurrentException(log, "Error in destructor function of MPPTunnel"); } + LOG_FMT_TRACE(log, "waiting child thread finished!"); thread_manager->wait(); + LOG_FMT_TRACE(log, "destructed tunnel obj!"); } template @@ -296,9 +303,11 @@ void MPPTunnelBase::waitForConsumerFinish(bool allow_throw) assert(connected); } #endif + LOG_FMT_TRACE(log, "start wait for consumer finish!"); String err_msg = consumer_state.getError(); // may blocking if (allow_throw && !err_msg.empty()) throw Exception("Consumer exits unexpected, " + err_msg); + LOG_FMT_TRACE(log, "end wait for consumer finish!"); } template @@ -330,8 +339,8 @@ template void MPPTunnelBase::consumerFinish(const String & err_msg, bool need_lock) { // must finish send_queue outside of the critical area to avoid deadlock with write. + LOG_FMT_TRACE(log, "calling consumer Finish"); send_queue.finish(); - auto rest_work = [this, &err_msg] { // it's safe to call it multiple times if (finished && consumer_state.errHasSet()) diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index 133142cc867..a2860c62947 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -56,6 +56,10 @@ class MPPTunnelTest : public MPPTunnelBase { return thread_manager; } + LoggerPtr getLog() + { + return log; + } }; using MPPTunnelTestPtr = std::shared_ptr; @@ -94,7 +98,9 @@ struct MockLocalReader if (tunnel) { // In case that ExchangeReceiver throw error before finish reading from mpp_tunnel + LOG_FMT_TRACE(tunnel->getLog(), "before mocklocalreader invoking consumerFinish!"); tunnel->consumerFinish("Receiver closed"); + LOG_FMT_TRACE(tunnel->getLog(), "after mocklocalreader invoking consumerFinish!"); } } @@ -483,6 +489,7 @@ try GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec.size(), 1); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[0], "First"); + LOG_FMT_TRACE(mpp_tunnel_ptr->getLog(), "basic logic done!"); } CATCH