From 535f22c774229cb24df06e82ef318a0a38380268 Mon Sep 17 00:00:00 2001 From: hzlushiliang Date: Wed, 25 Oct 2023 17:28:52 +0800 Subject: [PATCH] Fix: Adjust FiberTransport to lowdown latency in busrt tcp connect scenario - Do not capture proxy in version report function - Lowdown log level when TcpConnection ReadIoData error --- .../iomodel/reactor/default/tcp_connection.cc | 6 +- trpc/transport/client/fiber/BUILD | 1 + .../fiber/fiber_connector_group_manager.cc | 119 ++++++++---------- .../fiber/fiber_connector_group_manager.h | 18 ++- .../transport/client/fiber/fiber_transport.cc | 55 +++++--- .../client/fiber/fiber_transport_test.cc | 80 +++++++++++- 6 files changed, 181 insertions(+), 98 deletions(-) diff --git a/trpc/runtime/iomodel/reactor/default/tcp_connection.cc b/trpc/runtime/iomodel/reactor/default/tcp_connection.cc index 9dbe5e11..e6523b66 100644 --- a/trpc/runtime/iomodel/reactor/default/tcp_connection.cc +++ b/trpc/runtime/iomodel/reactor/default/tcp_connection.cc @@ -493,9 +493,9 @@ int TcpConnection::ReadIoData(NoncontiguousBuffer& buff) { continue; } ret = n; - TRPC_LOG_ERROR("TcpConnection::ReadIoData fd:" << socket_.GetFd() << ", ip:" << GetPeerIp() - << ", port:" << GetPeerPort() << ", is_client:" << IsClient() - << ", errno:" << errno << ", read failed and connection close."); + TRPC_LOG_WARN("TcpConnection::ReadIoData fd:" << socket_.GetFd() << ", ip:" << GetPeerIp() + << ", port:" << GetPeerPort() << ", is_client:" << IsClient() + << ", errno:" << errno << ", read failed and connection close."); HandleClose(true); } break; diff --git a/trpc/transport/client/fiber/BUILD b/trpc/transport/client/fiber/BUILD index 84e132cb..cbe024a3 100644 --- a/trpc/transport/client/fiber/BUILD +++ b/trpc/transport/client/fiber/BUILD @@ -55,6 +55,7 @@ cc_test( "//trpc/common/future:future_utility", "//trpc/coroutine:fiber", "//trpc/coroutine:future", + "//trpc/stream:stream_handler", "//trpc/transport/client/fiber/testing:fake_server", "//trpc/transport/client/fiber/testing:thread_model_op", "@com_google_googletest//:gtest_main", diff --git a/trpc/transport/client/fiber/fiber_connector_group_manager.cc b/trpc/transport/client/fiber/fiber_connector_group_manager.cc index 663fe92a..5df23322 100644 --- a/trpc/transport/client/fiber/fiber_connector_group_manager.cc +++ b/trpc/transport/client/fiber/fiber_connector_group_manager.cc @@ -27,33 +27,31 @@ namespace trpc { FiberConnectorGroupManager::FiberConnectorGroupManager(TransInfo&& trans_info) : trans_info_(std::move(trans_info)) { - tcp_impl_.store(std::make_unique().release()); udp_impl_.store(std::make_unique().release()); - fiber_transport_state_.store(ClientTransportState::kInitialized, std::memory_order_release); + fiber_transport_state_.store(ClientTransportState::kInitialized); } FiberConnectorGroupManager::~FiberConnectorGroupManager() { Stop(); } void FiberConnectorGroupManager::Stop() { - std::scoped_lock _(mutex_); - - if (fiber_transport_state_.load(std::memory_order_acquire) != ClientTransportState::kInitialized) { + ClientTransportState old_value = ClientTransportState::kInitialized; + ClientTransportState new_value = ClientTransportState::kStopped; + if (!fiber_transport_state_.compare_exchange_strong(old_value, new_value)) { return; } { - Hazptr hazptr; - auto tcp_impl = hazptr.Keep(&tcp_impl_); - if (tcp_impl) { - for (auto&& [name, group] : tcp_impl->tcp_connector_groups) { - (void)name; - group->Stop(); - } - } + std::unique_lock lock(tcp_mutex_); + tcp_connector_groups_.swap(tcp_connector_groups_to_destroy_); + } + + for (auto& group : tcp_connector_groups_to_destroy_) { + group.second->Stop(); } { + std::scoped_lock _(udp_mutex_); Hazptr hazptr; auto udp_impl = hazptr.Keep(&udp_impl_); if (udp_impl) { @@ -66,47 +64,45 @@ void FiberConnectorGroupManager::Stop() { } } } - - fiber_transport_state_.store(ClientTransportState::kStopped, std::memory_order_release); } void FiberConnectorGroupManager::Destroy() { - std::scoped_lock _(mutex_); - - if (fiber_transport_state_.load(std::memory_order_acquire) != ClientTransportState::kStopped) { + ClientTransportState old_value = ClientTransportState::kStopped; + ClientTransportState new_value = ClientTransportState::kDestroyed; + if (!fiber_transport_state_.compare_exchange_strong(old_value, new_value)) { return; } - auto tcp_impl = tcp_impl_.exchange(nullptr, std::memory_order_relaxed); - if (tcp_impl) { - for (auto&& [name, group] : tcp_impl->tcp_connector_groups) { - (void)name; - group->Destroy(); - delete group; - group = nullptr; - } + std::unordered_map tmp_tcp; + { + std::unique_lock lock(tcp_mutex_); + tcp_connector_groups_to_destroy_.swap(tmp_tcp); + } - tcp_impl->tcp_connector_groups.clear(); - tcp_impl->Retire(); + for (auto& group : tmp_tcp) { + group.second->Destroy(); + delete group.second; + group.second = nullptr; } - auto udp_impl = udp_impl_.exchange(nullptr, std::memory_order_relaxed); - if (udp_impl) { - if (udp_impl->udp_connector_groups[0] != nullptr) { - udp_impl->udp_connector_groups[0]->Destroy(); - delete udp_impl->udp_connector_groups[0]; - udp_impl->udp_connector_groups[0] = nullptr; - } + { + std::scoped_lock _(udp_mutex_); + auto udp_impl = udp_impl_.exchange(nullptr, std::memory_order_relaxed); + if (udp_impl) { + if (udp_impl->udp_connector_groups[0] != nullptr) { + udp_impl->udp_connector_groups[0]->Destroy(); + delete udp_impl->udp_connector_groups[0]; + udp_impl->udp_connector_groups[0] = nullptr; + } - if (udp_impl->udp_connector_groups[1] != nullptr) { - udp_impl->udp_connector_groups[1]->Destroy(); - delete udp_impl->udp_connector_groups[1]; - udp_impl->udp_connector_groups[1] = nullptr; + if (udp_impl->udp_connector_groups[1] != nullptr) { + udp_impl->udp_connector_groups[1]->Destroy(); + delete udp_impl->udp_connector_groups[1]; + udp_impl->udp_connector_groups[1] = nullptr; + } + udp_impl->Retire(); } - udp_impl->Retire(); } - - fiber_transport_state_.store(ClientTransportState::kDestroyed, std::memory_order_release); } FiberConnectorGroup* FiberConnectorGroupManager::Get(const NodeAddr& node_addr) { @@ -123,40 +119,29 @@ FiberConnectorGroup* FiberConnectorGroupManager::GetFromTcpGroup(const NodeAddr& snprintf(const_cast(endpoint.c_str()), len, "%s:%d", node_addr.ip.c_str(), node_addr.port); { - Hazptr hazptr; - auto ptr = hazptr.Keep(&tcp_impl_); - auto it = ptr->tcp_connector_groups.find(endpoint); - if (it != ptr->tcp_connector_groups.end()) { + std::shared_lock lock(tcp_mutex_); + auto it = tcp_connector_groups_.find(endpoint); + if (it != tcp_connector_groups_.end()) { return it->second; } } - FiberConnectorGroup* pool{nullptr}; + if (TRPC_UNLIKELY(fiber_transport_state_.load(std::memory_order_relaxed) != ClientTransportState::kInitialized)) { + return nullptr; + } { - auto new_tcp_impl = std::make_unique(); - - std::scoped_lock _(mutex_); - - { - Hazptr hazptr; - auto ptr = hazptr.Keep(&tcp_impl_); - auto it = ptr->tcp_connector_groups.find(endpoint); - if (it != ptr->tcp_connector_groups.end()) { - return it->second; - } else { - new_tcp_impl->tcp_connector_groups = ptr->tcp_connector_groups; - } + std::unique_lock lock(tcp_mutex_); + auto it = tcp_connector_groups_.find(endpoint); + if (it != tcp_connector_groups_.end()) { + return it->second; } - pool = CreateTcpConnectorGroup(node_addr); + FiberConnectorGroup* pool = CreateTcpConnectorGroup(node_addr); + tcp_connector_groups_.emplace(std::move(endpoint), pool); - new_tcp_impl->tcp_connector_groups.emplace(std::move(endpoint), pool); - - tcp_impl_.exchange(new_tcp_impl.release(), std::memory_order_acq_rel)->Retire(); + return pool; } - - return pool; } FiberConnectorGroup* FiberConnectorGroupManager::GetFromUdpGroup(const NodeAddr& node_addr) { @@ -176,7 +161,7 @@ FiberConnectorGroup* FiberConnectorGroupManager::GetFromUdpGroup(const NodeAddr& { auto new_udp_impl = std::make_unique(); - std::scoped_lock _(mutex_); + std::scoped_lock _(udp_mutex_); { Hazptr hazptr; auto old_udp_impl = hazptr.Keep(&udp_impl_); diff --git a/trpc/transport/client/fiber/fiber_connector_group_manager.h b/trpc/transport/client/fiber/fiber_connector_group_manager.h index e3fa75d6..fdd79153 100644 --- a/trpc/transport/client/fiber/fiber_connector_group_manager.h +++ b/trpc/transport/client/fiber/fiber_connector_group_manager.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include "trpc/transport/client/client_transport_message.h" @@ -50,20 +51,25 @@ class FiberConnectorGroupManager { int GetUdpConnectorGroupIndex(bool is_ipv6) const { return is_ipv6 ? 1 : 0; } private: - struct TcpImpl : HazptrObject { - std::unordered_map tcp_connector_groups; - }; - struct UdpImpl : HazptrObject { // 0: ipv4, 1: ipv6 FiberConnectorGroup* udp_connector_groups[2]; }; - std::atomic tcp_impl_{nullptr}; + std::unordered_map tcp_connector_groups_; + + // Initialized by Stop function to store connector groups which will by destroyed by Destroy function. + std::unordered_map tcp_connector_groups_to_destroy_; + + // With gcc 8.3.1, no fiber yield is allowed in critical section protected by shared mutex, + // as fiber may rescheduled into another thread, making shared mutex unlock an undefined behavior, + // which may leads to deadlock, as specified by cpp cppreference: + // https://en.cppreference.com/w/cpp/thread/shared_mutex/unlock + mutable std::shared_mutex tcp_mutex_; std::atomic udp_impl_{nullptr}; - std::mutex mutex_; + std::mutex udp_mutex_; std::atomic fiber_transport_state_{ClientTransportState::kUnknown}; diff --git a/trpc/transport/client/fiber/fiber_transport.cc b/trpc/transport/client/fiber/fiber_transport.cc index 3329070a..fa85be7a 100644 --- a/trpc/transport/client/fiber/fiber_transport.cc +++ b/trpc/transport/client/fiber/fiber_transport.cc @@ -44,12 +44,17 @@ void FiberTransport::Destroy() { int FiberTransport::SendRecv(CTransportReqMsg* req_msg, CTransportRspMsg* rsp_msg) { if (IsRunningInFiberWorker()) { - FiberConnectorGroup* connector_group = connector_group_manager_->Get(req_msg->context->GetNodeAddr()); - TRPC_ASSERT(connector_group && "connector_group can not be nullptr"); - if (!req_msg->context->IsBackupRequest()) { - return connector_group->SendRecv(req_msg, rsp_msg); + FiberConnectorGroup* connector_group = connector_group_manager_->Get(req_msg->context->GetNodeAddr()); + if (connector_group != nullptr) { + return connector_group->SendRecv(req_msg, rsp_msg); + } + + TRPC_FMT_ERROR("Can't get connector group of {}:{}", req_msg->context->GetNodeAddr().ip, + req_msg->context->GetNodeAddr().port); + return TrpcRetCode::TRPC_INVOKE_UNKNOWN_ERR; } + return SendRecvForBackupRequest(req_msg, rsp_msg); } @@ -89,9 +94,6 @@ int FiberTransport::SendRecvForBackupRequest(CTransportReqMsg* req_msg, CTranspo NoncontiguousBuffer buff_back(req_msg->send_data); for (int i = 0; i < 2; ++i) { - FiberConnectorGroup* connector_group = connector_group_manager_->Get(backup_info->backup_addrs[i].addr); - TRPC_ASSERT(connector_group && "connector_group can not be nullptr"); - auto cb = [&ret_code, i, backup_info, sync_retry](int err_code, std::string&& err_msg) { if (sync_retry->IsFinished()) { return; @@ -110,7 +112,14 @@ int FiberTransport::SendRecvForBackupRequest(CTransportReqMsg* req_msg, CTranspo } }; - connector_group->SendRecvForBackupRequest(req_msg, rsp_msg, std::move(cb)); + auto& addr = backup_info->backup_addrs[i].addr; + FiberConnectorGroup* connector_group = connector_group_manager_->Get(addr); + if (connector_group != nullptr) { + connector_group->SendRecvForBackupRequest(req_msg, rsp_msg, std::move(cb)); + } else { + TRPC_FMT_ERROR("Can't get connector group of {}:{}", addr.ip, addr.port); + cb(TrpcRetCode::TRPC_CLIENT_CONNECT_ERR, "connector_group is nullptr"); + } if (i == 0) { if (ret_code != TrpcRetCode::TRPC_CLIENT_CONNECT_ERR) { @@ -132,11 +141,16 @@ int FiberTransport::SendRecvForBackupRequest(CTransportReqMsg* req_msg, CTranspo Future FiberTransport::AsyncSendRecv(CTransportReqMsg* req_msg) { if (IsRunningInFiberWorker()) { - FiberConnectorGroup* connector_group = connector_group_manager_->Get(req_msg->context->GetNodeAddr()); - TRPC_ASSERT(connector_group && "connector_group can not be nullptr"); - if (!req_msg->context->IsBackupRequest()) { - return connector_group->AsyncSendRecv(req_msg); + FiberConnectorGroup* connector_group = connector_group_manager_->Get(req_msg->context->GetNodeAddr()); + if (connector_group != nullptr) { + return connector_group->AsyncSendRecv(req_msg); + } + + TRPC_FMT_ERROR("Can't get connector group of {}:{}", req_msg->context->GetNodeAddr().ip, + req_msg->context->GetNodeAddr().port); + return MakeExceptionFuture( + CommonException("not found connector group.", TrpcRetCode::TRPC_INVOKE_UNKNOWN_ERR)); } return MakeExceptionFuture( @@ -174,9 +188,13 @@ int FiberTransport::SendOnly(CTransportReqMsg* req_msg) { if (IsRunningInFiberWorker()) { FiberConnectorGroup* connector_group = connector_group_manager_->Get(req_msg->context->GetNodeAddr()); - TRPC_ASSERT(connector_group && "connector_group can not be nullptr"); - - ret = connector_group->SendOnly(req_msg); + if (connector_group != nullptr) { + ret = connector_group->SendOnly(req_msg); + } else { + TRPC_FMT_ERROR("Can't get connector group of {}:{}", req_msg->context->GetNodeAddr().ip, + req_msg->context->GetNodeAddr().port); + ret = TrpcRetCode::TRPC_INVOKE_UNKNOWN_ERR; + } object_pool::Delete(req_msg); } else { @@ -206,9 +224,12 @@ int FiberTransport::SendOnlyFromOutSide(CTransportReqMsg* req_msg) { stream::StreamReaderWriterProviderPtr FiberTransport::CreateStream(const NodeAddr& addr, stream::StreamOptions&& stream_options) { FiberConnectorGroup* connector_group = connector_group_manager_->Get(addr); - TRPC_ASSERT(connector_group && "connector_group can not be nullptr"); + if (connector_group) { + return connector_group->CreateStream(std::move(stream_options)); + } - return connector_group->CreateStream(std::move(stream_options)); + TRPC_FMT_ERROR("Can't get connector group of {}:{}", addr.ip, addr.port); + return nullptr; } } // namespace trpc diff --git a/trpc/transport/client/fiber/fiber_transport_test.cc b/trpc/transport/client/fiber/fiber_transport_test.cc index 230c9726..d6f46719 100644 --- a/trpc/transport/client/fiber/fiber_transport_test.cc +++ b/trpc/transport/client/fiber/fiber_transport_test.cc @@ -26,11 +26,16 @@ #include "trpc/coroutine/fiber.h" #include "trpc/coroutine/future.h" #include "trpc/future/future.h" +#include "trpc/stream/stream_handler.h" #include "trpc/transport/client/fiber/testing/fake_server.h" #include "trpc/transport/client/fiber/testing/thread_model_op.h" namespace trpc::tesing { +int SendRecvAfterStop(std::unique_ptr& transport); +int SendOnlyAfterStop(std::unique_ptr& transport); +stream::StreamReaderWriterProviderPtr CreateStreamAfterStop(std::unique_ptr& transport); + class FiberTransportFixture : public ::testing::Test { public: static void SetUpTestCase() { @@ -94,6 +99,9 @@ class FiberTransportFixture : public ::testing::Test { std::cout << "TearDownTestCase tcp_pipeline_transport destroy" << std::endl; tcp_pool_transport->Stop(); + ASSERT_EQ(SendRecvAfterStop(tcp_pool_transport), TrpcRetCode::TRPC_INVOKE_UNKNOWN_ERR); + ASSERT_EQ(SendOnlyAfterStop(tcp_pool_transport), TrpcRetCode::TRPC_INVOKE_UNKNOWN_ERR); + ASSERT_EQ(CreateStreamAfterStop(tcp_pool_transport), nullptr); tcp_pool_transport->Destroy(); tcp_pool_transport.reset(); @@ -106,6 +114,10 @@ class FiberTransportFixture : public ::testing::Test { std::cout << "TearDownTestCase tcp_short_transport destroy" << std::endl; tcp_complex_transport->Stop(); + + ASSERT_EQ(SendRecvAfterStop(tcp_complex_transport), TrpcRetCode::TRPC_INVOKE_UNKNOWN_ERR); + ASSERT_EQ(SendOnlyAfterStop(tcp_complex_transport), TrpcRetCode::TRPC_INVOKE_UNKNOWN_ERR); + ASSERT_EQ(CreateStreamAfterStop(tcp_complex_transport), nullptr); tcp_complex_transport->Destroy(); tcp_complex_transport.reset(); @@ -142,6 +154,56 @@ std::unique_ptr FiberTransportFixture::udp_complex_transport = n std::unique_ptr FiberTransportFixture::udp_pool_transport = nullptr; std::atomic FiberTransportFixture::id_gen = 1; +int SendRecvAfterStop(std::unique_ptr& transport) { + uint32_t seq_id = FiberTransportFixture::id_gen.fetch_add(1); + ClientContextPtr context = trpc::testing::MakeTestClientContext(seq_id, 1000, + FiberTransportFixture::fake_server->GetServerAddr()); + + trpc::CTransportReqMsg req_msg; + req_msg.context = context; + + trpc::testing::TestProtocol out; + out.req_id_ = seq_id; + out.body_ = "hello"; + + trpc::NoncontiguousBuffer buff; + out.ZeroCopyEncode(buff); + + req_msg.send_data = std::move(buff); + + trpc::CTransportRspMsg rsp_msg; + + return transport->SendRecv(&req_msg, &rsp_msg); +} + +int SendOnlyAfterStop(std::unique_ptr& transport) { + uint32_t seq_id = FiberTransportFixture::id_gen.fetch_add(1); + ClientContextPtr context = trpc::testing::MakeTestClientContext(seq_id, 1000, + FiberTransportFixture::fake_server->GetServerAddr()); + auto* msg = trpc::object_pool::New(); + trpc::CTransportReqMsg& req_msg = *msg; + req_msg.context = context; + + trpc::testing::TestProtocol out; + out.req_id_ = seq_id; + out.body_ = "oneway"; + + trpc::NoncontiguousBuffer buff; + out.ZeroCopyEncode(buff); + + req_msg.send_data = std::move(buff); + + return transport->SendOnly(msg); +} + +stream::StreamReaderWriterProviderPtr CreateStreamAfterStop(std::unique_ptr& transport) { + NodeAddr addr; + addr.ip = FiberTransportFixture::fake_server->GetServerAddr().Ip(); + addr.port = FiberTransportFixture::fake_server->GetServerAddr().Port(); + stream::StreamOptions stream_options; + return transport->CreateStream(addr, std::move(stream_options)); +} + void SendRecv(std::unique_ptr& transport) { uint32_t seq_id = FiberTransportFixture::id_gen.fetch_add(1); ClientContextPtr context = trpc::testing::MakeTestClientContext(seq_id, 1000, @@ -586,7 +648,7 @@ void SendOnlyWhenEndpointError(std::unique_ptr& transport) { ASSERT_TRUE(ret == 0); } -// Test the case of sending and receiving packets on the fiber worker thread +// Test the case of sending and receiving packets on the fiber worker thread // with incorrect ip/port under different connection modes of transport TEST_F(FiberTransportFixture, testSendOnly_endpoint_error) { std::cout << "tcp_complex_transport" << std::endl; @@ -614,7 +676,7 @@ void BackupRequest(std::unique_ptr& transport) { uint32_t seq_id = FiberTransportFixture::id_gen.fetch_add(1); ClientContextPtr context = trpc::testing::MakeTestClientContext(seq_id, 1000, FiberTransportFixture::fake_server->GetServerAddr()); - + std::vector backup_addrs; backup_addrs.push_back(FiberTransportFixture::fake_server->GetServerNodeAddr()); backup_addrs.push_back(FiberTransportFixture::fake_backup_server->GetServerNodeAddr()); @@ -666,7 +728,7 @@ void BackupRequestWhenBothReturn(std::unique_ptr& transport) { uint32_t seq_id = FiberTransportFixture::id_gen.fetch_add(1); ClientContextPtr context = trpc::testing::MakeTestClientContext(seq_id, 1000, FiberTransportFixture::fake_server->GetServerAddr()); - + std::vector backup_addrs; backup_addrs.push_back(FiberTransportFixture::fake_server->GetServerNodeAddr()); backup_addrs.push_back(FiberTransportFixture::fake_backup_server->GetServerNodeAddr()); @@ -722,7 +784,7 @@ void BackupRequestWhenFirstFailed(std::unique_ptr& transport) { uint32_t seq_id = FiberTransportFixture::id_gen.fetch_add(1); ClientContextPtr context = trpc::testing::MakeTestClientContext(seq_id, 1000, FiberTransportFixture::fake_server->GetServerAddr()); - + std::vector backup_addrs; trpc::NodeAddr addr; @@ -781,7 +843,7 @@ void BackupRequestWhenBothFailed(std::unique_ptr& transport) { uint32_t seq_id = FiberTransportFixture::id_gen.fetch_add(1); ClientContextPtr context = trpc::testing::MakeTestClientContext(seq_id, 1000, FiberTransportFixture::fake_server->GetServerAddr()); - + std::vector backup_addrs; trpc::NodeAddr addr1; @@ -837,6 +899,14 @@ TEST_F(FiberTransportFixture, testBackupRequest_both_fail) { BackupRequestWhenBothFailed(tcp_pipeline_transport); } +TEST_F(FiberTransportFixture, testGetConnectorGroupReturnNull) { + trpc::FiberTransport::Options tcp_complex_opt; + tcp_complex_opt.thread_model = trpc::fiber::GetFiberThreadModel(); + tcp_complex_opt.trans_info = MakeTransInfo(true); + auto transport = std::make_unique(); + tcp_complex_transport->Init(std::move(tcp_complex_opt)); +} + } // namespace trpc::tesing TEST_WITH_FIBER_MAIN