Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Adjust FiberTransport to lowdown latency in busrt tcp connect scenario #66

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions trpc/runtime/iomodel/reactor/default/tcp_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,9 @@ int TcpConnection::ReadIoData(NoncontiguousBuffer& buff) {
continue;
}
ret = n;
TRPC_LOG_ERROR("TcpConnection::ReadIoData fd:" << socket_.GetFd() << ", ip:" << GetPeerIp()
<< ", port:" << GetPeerPort() << ", is_client:" << IsClient()
<< ", errno:" << errno << ", read failed and connection close.");
TRPC_LOG_WARN("TcpConnection::ReadIoData fd:" << socket_.GetFd() << ", ip:" << GetPeerIp()
<< ", port:" << GetPeerPort() << ", is_client:" << IsClient()
<< ", errno:" << errno << ", read failed and connection close.");
HandleClose(true);
}
break;
Expand Down
1 change: 1 addition & 0 deletions trpc/transport/client/fiber/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ cc_test(
"//trpc/common/future:future_utility",
"//trpc/coroutine:fiber",
"//trpc/coroutine:future",
"//trpc/stream:stream_handler",
"//trpc/transport/client/fiber/testing:fake_server",
"//trpc/transport/client/fiber/testing:thread_model_op",
"@com_google_googletest//:gtest_main",
Expand Down
119 changes: 52 additions & 67 deletions trpc/transport/client/fiber/fiber_connector_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,31 @@
namespace trpc {

FiberConnectorGroupManager::FiberConnectorGroupManager(TransInfo&& trans_info) : trans_info_(std::move(trans_info)) {
tcp_impl_.store(std::make_unique<TcpImpl>().release());
udp_impl_.store(std::make_unique<UdpImpl>().release());

fiber_transport_state_.store(ClientTransportState::kInitialized, std::memory_order_release);
fiber_transport_state_.store(ClientTransportState::kInitialized);
}

FiberConnectorGroupManager::~FiberConnectorGroupManager() { Stop(); }

void FiberConnectorGroupManager::Stop() {
std::scoped_lock _(mutex_);

if (fiber_transport_state_.load(std::memory_order_acquire) != ClientTransportState::kInitialized) {
ClientTransportState old_value = ClientTransportState::kInitialized;
ClientTransportState new_value = ClientTransportState::kStopped;
if (!fiber_transport_state_.compare_exchange_strong(old_value, new_value)) {
return;
}

{
Hazptr hazptr;
auto tcp_impl = hazptr.Keep(&tcp_impl_);
if (tcp_impl) {
for (auto&& [name, group] : tcp_impl->tcp_connector_groups) {
(void)name;
group->Stop();
}
}
std::unique_lock lock(tcp_mutex_);
tcp_connector_groups_.swap(tcp_connector_groups_to_destroy_);
}

for (auto& group : tcp_connector_groups_to_destroy_) {
group.second->Stop();
}

{
std::scoped_lock _(udp_mutex_);
Hazptr hazptr;
auto udp_impl = hazptr.Keep(&udp_impl_);
if (udp_impl) {
Expand All @@ -66,47 +64,45 @@
}
}
}

fiber_transport_state_.store(ClientTransportState::kStopped, std::memory_order_release);
}

void FiberConnectorGroupManager::Destroy() {
std::scoped_lock _(mutex_);

if (fiber_transport_state_.load(std::memory_order_acquire) != ClientTransportState::kStopped) {
ClientTransportState old_value = ClientTransportState::kStopped;
ClientTransportState new_value = ClientTransportState::kDestroyed;
if (!fiber_transport_state_.compare_exchange_strong(old_value, new_value)) {
return;
}

auto tcp_impl = tcp_impl_.exchange(nullptr, std::memory_order_relaxed);
if (tcp_impl) {
for (auto&& [name, group] : tcp_impl->tcp_connector_groups) {
(void)name;
group->Destroy();
delete group;
group = nullptr;
}
std::unordered_map<std::string, FiberConnectorGroup*> tmp_tcp;
{
std::unique_lock lock(tcp_mutex_);
tcp_connector_groups_to_destroy_.swap(tmp_tcp);
}

tcp_impl->tcp_connector_groups.clear();
tcp_impl->Retire();
for (auto& group : tmp_tcp) {
group.second->Destroy();
delete group.second;
group.second = nullptr;
}

auto udp_impl = udp_impl_.exchange(nullptr, std::memory_order_relaxed);
if (udp_impl) {
if (udp_impl->udp_connector_groups[0] != nullptr) {
udp_impl->udp_connector_groups[0]->Destroy();
delete udp_impl->udp_connector_groups[0];
udp_impl->udp_connector_groups[0] = nullptr;
}
{
std::scoped_lock _(udp_mutex_);
auto udp_impl = udp_impl_.exchange(nullptr, std::memory_order_relaxed);
if (udp_impl) {
if (udp_impl->udp_connector_groups[0] != nullptr) {
udp_impl->udp_connector_groups[0]->Destroy();
delete udp_impl->udp_connector_groups[0];
udp_impl->udp_connector_groups[0] = nullptr;
}

if (udp_impl->udp_connector_groups[1] != nullptr) {
udp_impl->udp_connector_groups[1]->Destroy();
delete udp_impl->udp_connector_groups[1];
udp_impl->udp_connector_groups[1] = nullptr;
if (udp_impl->udp_connector_groups[1] != nullptr) {
udp_impl->udp_connector_groups[1]->Destroy();
delete udp_impl->udp_connector_groups[1];
udp_impl->udp_connector_groups[1] = nullptr;

Check warning on line 101 in trpc/transport/client/fiber/fiber_connector_group_manager.cc

View check run for this annotation

Codecov / codecov/patch

trpc/transport/client/fiber/fiber_connector_group_manager.cc#L99-L101

Added lines #L99 - L101 were not covered by tests
}
udp_impl->Retire();
}
udp_impl->Retire();
}

fiber_transport_state_.store(ClientTransportState::kDestroyed, std::memory_order_release);
}

FiberConnectorGroup* FiberConnectorGroupManager::Get(const NodeAddr& node_addr) {
Expand All @@ -123,40 +119,29 @@
snprintf(const_cast<char*>(endpoint.c_str()), len, "%s:%d", node_addr.ip.c_str(), node_addr.port);

{
Hazptr hazptr;
auto ptr = hazptr.Keep(&tcp_impl_);
auto it = ptr->tcp_connector_groups.find(endpoint);
if (it != ptr->tcp_connector_groups.end()) {
std::shared_lock lock(tcp_mutex_);
auto it = tcp_connector_groups_.find(endpoint);
if (it != tcp_connector_groups_.end()) {
return it->second;
}
}

FiberConnectorGroup* pool{nullptr};
if (TRPC_UNLIKELY(fiber_transport_state_.load(std::memory_order_relaxed) != ClientTransportState::kInitialized)) {
return nullptr;
}

{
auto new_tcp_impl = std::make_unique<TcpImpl>();

std::scoped_lock _(mutex_);

{
Hazptr hazptr;
auto ptr = hazptr.Keep(&tcp_impl_);
auto it = ptr->tcp_connector_groups.find(endpoint);
if (it != ptr->tcp_connector_groups.end()) {
return it->second;
} else {
new_tcp_impl->tcp_connector_groups = ptr->tcp_connector_groups;
}
std::unique_lock lock(tcp_mutex_);
auto it = tcp_connector_groups_.find(endpoint);
if (it != tcp_connector_groups_.end()) {
return it->second;

Check warning on line 137 in trpc/transport/client/fiber/fiber_connector_group_manager.cc

View check run for this annotation

Codecov / codecov/patch

trpc/transport/client/fiber/fiber_connector_group_manager.cc#L137

Added line #L137 was not covered by tests
}

pool = CreateTcpConnectorGroup(node_addr);
FiberConnectorGroup* pool = CreateTcpConnectorGroup(node_addr);
tcp_connector_groups_.emplace(std::move(endpoint), pool);

new_tcp_impl->tcp_connector_groups.emplace(std::move(endpoint), pool);

tcp_impl_.exchange(new_tcp_impl.release(), std::memory_order_acq_rel)->Retire();
return pool;
}

return pool;
}

FiberConnectorGroup* FiberConnectorGroupManager::GetFromUdpGroup(const NodeAddr& node_addr) {
Expand All @@ -176,7 +161,7 @@
{
auto new_udp_impl = std::make_unique<UdpImpl>();

std::scoped_lock _(mutex_);
std::scoped_lock _(udp_mutex_);
{
Hazptr hazptr;
auto old_udp_impl = hazptr.Keep(&udp_impl_);
Expand Down
18 changes: 12 additions & 6 deletions trpc/transport/client/fiber/fiber_connector_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <atomic>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>

#include "trpc/transport/client/client_transport_message.h"
Expand Down Expand Up @@ -50,20 +51,25 @@ class FiberConnectorGroupManager {
int GetUdpConnectorGroupIndex(bool is_ipv6) const { return is_ipv6 ? 1 : 0; }

private:
struct TcpImpl : HazptrObject<TcpImpl> {
std::unordered_map<std::string, FiberConnectorGroup*> tcp_connector_groups;
};

struct UdpImpl : HazptrObject<UdpImpl> {
// 0: ipv4, 1: ipv6
FiberConnectorGroup* udp_connector_groups[2];
};

std::atomic<TcpImpl*> tcp_impl_{nullptr};
std::unordered_map<std::string, FiberConnectorGroup*> tcp_connector_groups_;

// Initialized by Stop function to store connector groups which will by destroyed by Destroy function.
std::unordered_map<std::string, FiberConnectorGroup*> tcp_connector_groups_to_destroy_;

// With gcc 8.3.1, no fiber yield is allowed in critical section protected by shared mutex,
// as fiber may rescheduled into another thread, making shared mutex unlock an undefined behavior,
// which may leads to deadlock, as specified by cpp cppreference:
// https://en.cppreference.com/w/cpp/thread/shared_mutex/unlock
mutable std::shared_mutex tcp_mutex_;

std::atomic<UdpImpl*> udp_impl_{nullptr};

std::mutex mutex_;
std::mutex udp_mutex_;

std::atomic<ClientTransportState> fiber_transport_state_{ClientTransportState::kUnknown};

Expand Down
55 changes: 38 additions & 17 deletions trpc/transport/client/fiber/fiber_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,17 @@

int FiberTransport::SendRecv(CTransportReqMsg* req_msg, CTransportRspMsg* rsp_msg) {
if (IsRunningInFiberWorker()) {
FiberConnectorGroup* connector_group = connector_group_manager_->Get(req_msg->context->GetNodeAddr());
TRPC_ASSERT(connector_group && "connector_group can not be nullptr");

if (!req_msg->context->IsBackupRequest()) {
return connector_group->SendRecv(req_msg, rsp_msg);
FiberConnectorGroup* connector_group = connector_group_manager_->Get(req_msg->context->GetNodeAddr());
if (connector_group != nullptr) {
return connector_group->SendRecv(req_msg, rsp_msg);
}

TRPC_FMT_ERROR("Can't get connector group of {}:{}", req_msg->context->GetNodeAddr().ip,
req_msg->context->GetNodeAddr().port);
return TrpcRetCode::TRPC_INVOKE_UNKNOWN_ERR;
}

return SendRecvForBackupRequest(req_msg, rsp_msg);
}

Expand Down Expand Up @@ -89,9 +94,6 @@
NoncontiguousBuffer buff_back(req_msg->send_data);

for (int i = 0; i < 2; ++i) {
FiberConnectorGroup* connector_group = connector_group_manager_->Get(backup_info->backup_addrs[i].addr);
TRPC_ASSERT(connector_group && "connector_group can not be nullptr");

auto cb = [&ret_code, i, backup_info, sync_retry](int err_code, std::string&& err_msg) {
if (sync_retry->IsFinished()) {
return;
Expand All @@ -110,7 +112,14 @@
}
};

connector_group->SendRecvForBackupRequest(req_msg, rsp_msg, std::move(cb));
auto& addr = backup_info->backup_addrs[i].addr;
FiberConnectorGroup* connector_group = connector_group_manager_->Get(addr);
if (connector_group != nullptr) {
connector_group->SendRecvForBackupRequest(req_msg, rsp_msg, std::move(cb));
} else {
TRPC_FMT_ERROR("Can't get connector group of {}:{}", addr.ip, addr.port);
cb(TrpcRetCode::TRPC_CLIENT_CONNECT_ERR, "connector_group is nullptr");

Check warning on line 121 in trpc/transport/client/fiber/fiber_transport.cc

View check run for this annotation

Codecov / codecov/patch

trpc/transport/client/fiber/fiber_transport.cc#L120-L121

Added lines #L120 - L121 were not covered by tests
}

if (i == 0) {
if (ret_code != TrpcRetCode::TRPC_CLIENT_CONNECT_ERR) {
Expand All @@ -132,11 +141,16 @@

Future<CTransportRspMsg> FiberTransport::AsyncSendRecv(CTransportReqMsg* req_msg) {
if (IsRunningInFiberWorker()) {
FiberConnectorGroup* connector_group = connector_group_manager_->Get(req_msg->context->GetNodeAddr());
TRPC_ASSERT(connector_group && "connector_group can not be nullptr");

if (!req_msg->context->IsBackupRequest()) {
return connector_group->AsyncSendRecv(req_msg);
FiberConnectorGroup* connector_group = connector_group_manager_->Get(req_msg->context->GetNodeAddr());
if (connector_group != nullptr) {
return connector_group->AsyncSendRecv(req_msg);
}

TRPC_FMT_ERROR("Can't get connector group of {}:{}", req_msg->context->GetNodeAddr().ip,

Check warning on line 150 in trpc/transport/client/fiber/fiber_transport.cc

View check run for this annotation

Codecov / codecov/patch

trpc/transport/client/fiber/fiber_transport.cc#L150

Added line #L150 was not covered by tests
req_msg->context->GetNodeAddr().port);
return MakeExceptionFuture<CTransportRspMsg>(
CommonException("not found connector group.", TrpcRetCode::TRPC_INVOKE_UNKNOWN_ERR));

Check warning on line 153 in trpc/transport/client/fiber/fiber_transport.cc

View check run for this annotation

Codecov / codecov/patch

trpc/transport/client/fiber/fiber_transport.cc#L153

Added line #L153 was not covered by tests
}

return MakeExceptionFuture<CTransportRspMsg>(
Expand Down Expand Up @@ -174,9 +188,13 @@

if (IsRunningInFiberWorker()) {
FiberConnectorGroup* connector_group = connector_group_manager_->Get(req_msg->context->GetNodeAddr());
TRPC_ASSERT(connector_group && "connector_group can not be nullptr");

ret = connector_group->SendOnly(req_msg);
if (connector_group != nullptr) {
ret = connector_group->SendOnly(req_msg);
} else {
TRPC_FMT_ERROR("Can't get connector group of {}:{}", req_msg->context->GetNodeAddr().ip,
req_msg->context->GetNodeAddr().port);
ret = TrpcRetCode::TRPC_INVOKE_UNKNOWN_ERR;
}

object_pool::Delete(req_msg);
} else {
Expand Down Expand Up @@ -206,9 +224,12 @@
stream::StreamReaderWriterProviderPtr FiberTransport::CreateStream(const NodeAddr& addr,
stream::StreamOptions&& stream_options) {
FiberConnectorGroup* connector_group = connector_group_manager_->Get(addr);
TRPC_ASSERT(connector_group && "connector_group can not be nullptr");
if (connector_group) {
return connector_group->CreateStream(std::move(stream_options));

Check warning on line 228 in trpc/transport/client/fiber/fiber_transport.cc

View check run for this annotation

Codecov / codecov/patch

trpc/transport/client/fiber/fiber_transport.cc#L228

Added line #L228 was not covered by tests
}

return connector_group->CreateStream(std::move(stream_options));
TRPC_FMT_ERROR("Can't get connector group of {}:{}", addr.ip, addr.port);
return nullptr;
}

} // namespace trpc
Loading
Loading