diff --git a/cmake/mysqlclient.cmake b/cmake/mysqlclient.cmake index 99ed79b1..7709605a 100644 --- a/cmake/mysqlclient.cmake +++ b/cmake/mysqlclient.cmake @@ -7,24 +7,16 @@ if(NOT DEFINED MYSQLCLIENT_VERSION_TAG) set(MYSQLCLIENT_VERSION_TAG 8.0.39) endif() -# ================================ -# Construct Download URL -# ================================ string(REGEX REPLACE "^([0-9]+\\.[0-9]+)\\..*" "\\1" MYSQLCLIENT_MAJOR_VER "${MYSQLCLIENT_VERSION_TAG}") set(MYSQLCLIENT_URL "https://dev.mysql.com/get/Downloads/MySQL-${MYSQLCLIENT_MAJOR_VER}/mysql-${MYSQLCLIENT_VERSION_TAG}-linux-glibc2.17-x86_64-minimal.tar.xz") -# ================================ -# FetchContent Declaration -# ================================ + FetchContent_Declare( mysqlclient URL ${MYSQLCLIENT_URL} SOURCE_DIR ${TRPC_ROOT_PATH}/cmake_third_party/mysqlclient ) -# ================================ -# Populate the Content -# ================================ FetchContent_GetProperties(mysqlclient) if(NOT mysqlclient_POPULATED) FetchContent_Populate(mysqlclient) @@ -34,16 +26,13 @@ set(MYSQLCLIENT_INCLUDE_DIR "${mysqlclient_SOURCE_DIR}/include") set(MYSQLCLIENT_LIB_DIR "${mysqlclient_SOURCE_DIR}/lib") -# Copy MySQL header files to the desired directory file(MAKE_DIRECTORY "${MYSQLCLIENT_INCLUDE_DIR}/mysqlclient") file(GLOB MYSQL_HEADERS "${mysqlclient_SOURCE_DIR}/include/*.h") -# Copy header files to the target include directory +# for #include "mysqlclient/mysql.h", otherwise we will directly #include "mysql.h" file(COPY ${MYSQL_HEADERS} DESTINATION "${MYSQLCLIENT_INCLUDE_DIR}/mysqlclient") -# ================================ -# Define Imported Library -# ================================ + add_library(mysqlclient STATIC IMPORTED) set_target_properties(mysqlclient PROPERTIES @@ -54,13 +43,6 @@ set_target_properties(mysqlclient PROPERTIES target_link_libraries(mysqlclient INTERFACE "${MYSQLCLIENT_LIB_DIR}/private/libcrypto.so" "${MYSQLCLIENT_LIB_DIR}/private/libssl.so" - "${MYSQLCLIENT_LIB_DIR}/private/libgssapi_krb5.so" - "${MYSQLCLIENT_LIB_DIR}/private/libkrb5.so" - "${MYSQLCLIENT_LIB_DIR}/private/libk5crypto.so" - "${MYSQLCLIENT_LIB_DIR}/private/libkrb5support.so" - "${MYSQLCLIENT_LIB_DIR}/private/liblber.so" - "${MYSQLCLIENT_LIB_DIR}/private/libldap.so" - "${MYSQLCLIENT_LIB_DIR}/private/libsasl2.so" ) add_library(trpc_mysqlclient ALIAS mysqlclient) diff --git a/examples/features/mysql/client/fiber/fiber_client.cc b/examples/features/mysql/client/fiber/fiber_client.cc index d1c090ba..4acf77c0 100644 --- a/examples/features/mysql/client/fiber/fiber_client.cc +++ b/examples/features/mysql/client/fiber/fiber_client.cc @@ -35,6 +35,15 @@ using trpc::mysql::MysqlBlob; DEFINE_string(client_config, "fiber_client_client_config.yaml", "trpc cpp framework client_config file"); + +#define MYSQL_ERROR_CHECK(status, res) if(!s.OK()) { \ + TRPC_FMT_ERROR("status: {}", s.ToString()); \ + return; \ + } else if(!res.OK()) { \ + TRPC_FMT_ERROR("MySQL error: {}", res.GetErrorMessage()); \ + return; \ + } \ + MysqlBlob GenRandomBlob(std::size_t length) { std::string random_data; random_data.reserve(length); @@ -93,25 +102,17 @@ void TestQuery(std::shared_ptr& proxy) { MysqlResults res; trpc::Status s = proxy->Query(ctx, res, "select id, username from users where id = ? and username = ?", 3, "carol"); - if(!s.OK()) { - TRPC_FMT_ERROR("status: {}", s.ToString()); - } else if(!res.OK()) { - TRPC_FMT_ERROR("MySQL error: {}", res.GetErrorMessage()); - } else { - // const std::vector>& res_set - auto& res_set = res.ResultSet(); - int id = std::get<0>(res_set[0]); - std::string username = std::get<1>(res_set[0]); - std::cout << "id: " << id << ", username: " << username << std::endl; - } + MYSQL_ERROR_CHECK(s, res); + // const std::vector>& res_set + auto& res_set = res.ResultSet(); + int id = std::get<0>(res_set[0]); + std::string username = std::get<1>(res_set[0]); + std::cout << "id: " << id << ", username: " << username << std::endl; MysqlResults res2; s = proxy->Query(ctx, res2, "select * from users"); - if(!s.OK()) { - TRPC_FMT_ERROR("status: {}", s.ToString()); - return; - } + MYSQL_ERROR_CHECK(s, res2); int col_index = 0; int row_index = 0; @@ -139,27 +140,33 @@ void TestUpdate(std::shared_ptr& proxy) { MysqlTime mtime; mtime.SetYear(2024).SetMonth(9).SetDay(10); - proxy->Execute(ctx, exec_res, + trpc::Status s = proxy->Execute(ctx, exec_res, "insert into users (username, email, created_at)" "values (\"jack\", \"jack@abc.com\", ?)", mtime); + MYSQL_ERROR_CHECK(s, exec_res); + if(1 == exec_res.GetAffectedRowNum()) std::cout << "Insert one\n"; ctx = trpc::MakeClientContext(proxy); - proxy->Execute(ctx, query_res, "select email, created_at from users where username = ?", + s = proxy->Execute(ctx, query_res, "select email, created_at from users where username = ?", "jack"); + MYSQL_ERROR_CHECK(s, query_res); auto& res_vec = query_res.ResultSet(); std::cout << std::get<0>(res_vec[0]) << std::endl; ctx = trpc::MakeClientContext(proxy); - proxy->Execute(ctx, exec_res, "delete from users where username = \"jack\""); + s = proxy->Execute(ctx, exec_res, "delete from users where username = \"jack\""); + MYSQL_ERROR_CHECK(s, exec_res); if(1 == exec_res.GetAffectedRowNum()) std::cout << "Delete one\n"; ctx = trpc::MakeClientContext(proxy); - proxy->Execute(ctx, query_res, "select email, created_at from users where username = ?", + s = proxy->Execute(ctx, query_res, "select email, created_at from users where username = ?", "jack"); + MYSQL_ERROR_CHECK(s, query_res); + if(query_res.ResultSet().empty()) std::cout << R"(No user "jack" in users)" << std::endl; } @@ -175,8 +182,13 @@ void TestTime(std::shared_ptr& proxy) { // Use MysqlTime MysqlResults time_res; - proxy->Query(ctx, str_res, "select created_at from users"); - proxy->Query(ctx, time_res, "select created_at from users"); + trpc::Status s; + s = proxy->Query(ctx, str_res, "select created_at from users"); + MYSQL_ERROR_CHECK(s, str_res); + + + s = proxy->Query(ctx, time_res, "select created_at from users"); + MYSQL_ERROR_CHECK(s, time_res); std::string_view str_time = str_res.ResultSet()[0][0]; MysqlTime my_time = std::get<0>(time_res.ResultSet()[0]); @@ -381,16 +393,19 @@ void TestBlob(std::shared_ptr& proxy) { MysqlResults str_res; - proxy->Query(ctx, bind_blob_res, "select meta from users where username = ?", "jack"); + s = proxy->Query(ctx, bind_blob_res, "select meta from users where username = ?", "jack"); + MYSQL_ERROR_CHECK(s, bind_blob_res); if(std::get<0>(bind_blob_res.ResultSet()[0]) == blob) std::cout << "same blob\n"; - proxy->Query(ctx, bind_str_res, "select meta from users where username = ?", "jack"); + s = proxy->Query(ctx, bind_str_res, "select meta from users where username = ?", "jack"); + MYSQL_ERROR_CHECK(s, bind_str_res); if(std::get<0>(bind_str_res.ResultSet()[0]) == blob.AsStringView()) std::cout << "same blob\n"; - proxy->Query(ctx, str_res, "select meta from users where username = ?", "jack"); + s = proxy->Query(ctx, str_res, "select meta from users where username = ?", "jack"); + MYSQL_ERROR_CHECK(s, str_res); auto str_view = str_res.ResultSet()[0][0]; if(MysqlBlob(std::string(str_view)) == blob) std::cout << "same blob\n"; @@ -399,12 +414,12 @@ void TestBlob(std::shared_ptr& proxy) { int Run() { auto proxy = ::trpc::GetTrpcClient()->GetProxy<::trpc::mysql::MysqlServiceProxy>("mysql_server"); -// TestQuery(proxy); -// TestUpdate(proxy); -// TestCommit(proxy); -// TestRollback(proxy); -// TestError(proxy); -// TestBlob(proxy); + TestQuery(proxy); + TestUpdate(proxy); + TestCommit(proxy); + TestRollback(proxy); + TestError(proxy); + TestBlob(proxy); TestTime(proxy); return 0; } diff --git a/examples/features/mysql/client/future/future_client.cc b/examples/features/mysql/client/future/future_client.cc index bef5336a..bc2a2202 100644 --- a/examples/features/mysql/client/future/future_client.cc +++ b/examples/features/mysql/client/future/future_client.cc @@ -104,6 +104,12 @@ void TestAsyncQuery(std::shared_ptr& proxy) { }); std::cout << "do something\n"; trpc::future::BlockingGet(std::move(future)); + + if(future.IsFailed()) { + TRPC_FMT_ERROR(future.GetException().what()); + std::cerr << future.GetException().what() << std::endl; + return; + } } @@ -140,6 +146,12 @@ void TestAsyncTx(std::shared_ptr& proxy) { }); auto fu3 = trpc::future::BlockingGet(std::move(fu2)); + + if(fu3.IsFailed()) { + TRPC_FMT_ERROR(fu3.GetException().what()); + std::cerr << fu3.GetException().what() << std::endl; + return; + } TransactionHandle handle2(fu3.GetValue0()); // Do query in "Then Chain" and rollback diff --git a/third_party/mysqlclient/mysqlclient.BUILD b/third_party/mysqlclient/mysqlclient.BUILD index 012cc73d..01683bc8 100644 --- a/third_party/mysqlclient/mysqlclient.BUILD +++ b/third_party/mysqlclient/mysqlclient.BUILD @@ -6,15 +6,16 @@ package( cc_library( name = "mysqlclient_deps", - # Exclude libprotobuf to avoid conflicts with the existing protobuf in the project. - srcs = glob(["lib/private/*"], - exclude = ["lib/private/libproto*"]), + srcs = [ + "lib/private/libcrypto.so.3", + "lib/private/libssl.so.3" + ], visibility = ["//visibility:private"], ) cc_library( name = "mysqlclient", - srcs = glob(["lib/libmysqlclient.*"]), + srcs = glob(["lib/libmysqlclient.a"]), hdrs = glob([ "include/**/*.h", ]), diff --git a/trpc/client/mysql/executor/mysql_executor.cc b/trpc/client/mysql/executor/mysql_executor.cc index b447dd0a..01e6bf5c 100644 --- a/trpc/client/mysql/executor/mysql_executor.cc +++ b/trpc/client/mysql/executor/mysql_executor.cc @@ -42,11 +42,10 @@ MysqlExecutor::~MysqlExecutor() { // Usually it will call Close() before destructor TRPC_ASSERT(is_connected == false); Close(); - // FreeResult(); } void MysqlExecutor::Close() { - if (mysql_ != nullptr) { + if (mysql_ != nullptr && is_connected) { mysql_close(mysql_); mysql_ = nullptr; } @@ -145,4 +144,13 @@ std::string MysqlExecutor::GetErrorMessage() { return mysql_error(mysql_); } +bool MysqlExecutor::Autocommit(bool mode) { + unsigned mode_n = mode ? 1 : 0; + if(mysql_autocommit(mysql_, mode_n) != 0) + return false; + + auto_commit_ = mode; + return true; +} + } // namespace trpc::mysql diff --git a/trpc/client/mysql/executor/mysql_executor.h b/trpc/client/mysql/executor/mysql_executor.h index 6af5cc6f..8f30743c 100644 --- a/trpc/client/mysql/executor/mysql_executor.h +++ b/trpc/client/mysql/executor/mysql_executor.h @@ -111,6 +111,11 @@ class MysqlExecutor : public RefCounted { ///@brief Close the mysql connection and free the MYSQL*. void Close(); + ///@brief set auto commit for current session + ///@param mode true if enable auto commit else false + ///@return true if no error + bool Autocommit(bool mode); + ///@brief Executes an SQL query and retrieves all resulting rows, storing each row as a tuple. /// /// This function executes the provided SQL query with the specified input arguments. @@ -129,6 +134,8 @@ class MysqlExecutor : public RefCounted { template bool Execute(MysqlResults& mysql_results, const std::string& query, const InputArgs&... args); + std::string GetErrorMessage(); + void RefreshAliveTime(); uint64_t GetAliveTime(); @@ -184,8 +191,6 @@ class MysqlExecutor : public RefCounted { template bool FetchTruncatedResults(MysqlExecutor::QueryHandle& handle); - std::string GetErrorMessage(); - private: /// Just protects the `mysql_init` api @@ -193,14 +198,22 @@ class MysqlExecutor : public RefCounted { bool is_connected; + bool auto_commit_{true}; + MYSQL* mysql_; + uint64_t m_alivetime; + uint64_t executor_id_{0}; std::string hostname_; + std::string username_; + std::string password_; + std::string database_; + uint16_t port_; }; diff --git a/trpc/client/mysql/mysql_executor_pool.cc b/trpc/client/mysql/mysql_executor_pool.cc index c399fa7e..e10943df 100644 --- a/trpc/client/mysql/mysql_executor_pool.cc +++ b/trpc/client/mysql/mysql_executor_pool.cc @@ -62,6 +62,7 @@ RefPtr MysqlExecutorPoolImpl::GetOrCreate() { } executor = CreateExecutor(shard_id); + if(executor->Connect()) { executor_num_.fetch_add(1, std::memory_order_relaxed); return executor; diff --git a/trpc/client/mysql/mysql_service_proxy.cc b/trpc/client/mysql/mysql_service_proxy.cc index f93a6ccc..69f13d9b 100644 --- a/trpc/client/mysql/mysql_service_proxy.cc +++ b/trpc/client/mysql/mysql_service_proxy.cc @@ -93,15 +93,23 @@ Status MysqlServiceProxy::Begin(const ClientContextPtr& context, TransactionHand MysqlExecutorPool* pool = this->pool_manager_->Get(node_addr); auto executor = pool->GetExecutor(); + Status status; + if(executor == nullptr) { + status.SetFrameworkRetCode(TrpcRetCode::TRPC_CLIENT_CONNECT_ERR); + status.SetErrorMessage("connection failed"); + TRPC_FMT_ERROR("service name:{}, connection failed", GetServiceName()); + context->SetStatus(std::move(status)); + } else { + status = UnaryInvoke(context, executor, res, "begin"); + } - Status s = UnaryInvoke(context, executor, res, "begin"); - if(!s.OK()) { - context->SetStatus(s); + if(!status.OK()) { + context->SetStatus(status); } else if(!res.OK()) { - s = kUnknownErrorStatus; - s.SetErrorMessage(res.GetErrorMessage()); - context->SetStatus(s); + status = kUnknownErrorStatus; + status.SetErrorMessage(res.GetErrorMessage()); + context->SetStatus(status); } else { handle.SetExecutor(std::move(executor)); handle.SetState(TransactionHandle::TxState::kStart); @@ -149,7 +157,14 @@ Future MysqlServiceProxy::AsyncBegin(const ClientContextPtr & MysqlExecutorPool* pool = this->pool_manager_->Get(node_addr); auto executor = pool->GetExecutor(); - + if(executor == nullptr) { + TRPC_FMT_ERROR("service name:{}, connection failed", GetServiceName()); + Status status; + status.SetFrameworkRetCode(TrpcRetCode::TRPC_CLIENT_CONNECT_ERR); + status.SetErrorMessage("connection failed"); + context->SetStatus(status); + return MakeExceptionFuture(CommonException("connection failed")); + } return AsyncUnaryInvoke(context, executor, "begin") .Then([executor](Future>&& f) mutable { diff --git a/trpc/client/mysql/mysql_service_proxy.h b/trpc/client/mysql/mysql_service_proxy.h index a728dce1..21d7983c 100644 --- a/trpc/client/mysql/mysql_service_proxy.h +++ b/trpc/client/mysql/mysql_service_proxy.h @@ -299,14 +299,21 @@ Status MysqlServiceProxy::UnaryInvoke(const ClientContextPtr& context, const Exe } else conn = executor; - if constexpr (MysqlResults::mode == MysqlResultsMode::OnlyExec) - conn->Execute(res, sql_str, args...); - else - conn->QueryAll(res, sql_str, args...); - - - if(pool != nullptr) - pool->Reclaim(0, std::move(conn)); + if(conn == nullptr) { + Status status; + status.SetFrameworkRetCode(TrpcRetCode::TRPC_CLIENT_CONNECT_ERR); + status.SetErrorMessage("connection failed"); + TRPC_FMT_ERROR("service name:{}, connection failed", GetServiceName()); + context->SetStatus(std::move(status)); + } else { + if constexpr (MysqlResults::mode == MysqlResultsMode::OnlyExec) + conn->Execute(res, sql_str, args...); + else + conn->QueryAll(res, sql_str, args...); + + if(pool != nullptr) + pool->Reclaim(0, std::move(conn)); + } e.Set(); }); @@ -358,14 +365,11 @@ MysqlServiceProxy::AsyncUnaryInvoke(const ClientContextPtr& context, const Execu conn = executor; if (TRPC_UNLIKELY(!conn)) { - std::string error; - error += "Failed to get connection from pool: timeout while trying to acquire a connection."; - error += node_addr.ip + ":" + std::to_string(node_addr.port); - TRPC_LOG_ERROR(error); + TRPC_FMT_ERROR("service name:{}, connection failed", GetServiceName()); Status status; status.SetFrameworkRetCode(TrpcRetCode::TRPC_CLIENT_CONNECT_ERR); - status.SetErrorMessage(error); + status.SetErrorMessage("connection failed"); context->SetStatus(status); p.SetException(CommonException(status.ErrorMessage().c_str())); diff --git a/trpc/client/mysql/mysql_service_proxy_test.cc b/trpc/client/mysql/mysql_service_proxy_test.cc index 80fdff46..d5629fda 100644 --- a/trpc/client/mysql/mysql_service_proxy_test.cc +++ b/trpc/client/mysql/mysql_service_proxy_test.cc @@ -164,9 +164,7 @@ TEST_F(MysqlServiceProxyTest, Execute) { auto client_context = GetClientContext(); MysqlResults exec_res; MysqlTime mtime; - mtime.mt.year = 2024; - mtime.mt.month = 9; - mtime.mt.day = 10; + mtime.SetYear(2024).SetMonth(9).SetDay(10); mock_mysql_service_proxy_->Execute(client_context, exec_res, "insert into users (username, email, created_at) \ @@ -196,9 +194,8 @@ TEST_F(MysqlServiceProxyTest, Execute) { TEST_F(MysqlServiceProxyTest, AsyncExecute) { auto client_context = GetClientContext(); MysqlTime mtime; - mtime.mt.year = 2024; - mtime.mt.month = 9; - mtime.mt.day = 10; + mtime.SetYear(2024).SetMonth(9).SetDay(10); + auto res = mock_mysql_service_proxy_ ->AsyncExecute(client_context, @@ -369,9 +366,8 @@ TEST_F(MysqlServiceProxyTest, Transaction) { MysqlResults exec_res; MysqlResults query_res; MysqlTime mtime; - mtime.mt.year = 2024; - mtime.mt.month = 9; - mtime.mt.day = 10; + mtime.SetYear(2024).SetMonth(9).SetDay(10); + Status s = mock_mysql_service_proxy_->Begin(client_context, handle); EXPECT_EQ(s.OK(), true); mock_mysql_service_proxy_->Execute(client_context, handle, exec_res, @@ -387,6 +383,30 @@ TEST_F(MysqlServiceProxyTest, Transaction) { EXPECT_EQ(0, query_res.ResultSet().size()); } +TEST_F(MysqlServiceProxyTest, TransactionNoCommit) { + auto client_context = GetClientContext(); + TransactionHandle handle; + MysqlResults exec_res; + MysqlResults query_res; + MysqlTime mtime; + mtime.SetYear(2024).SetMonth(9).SetDay(10); + + Status s = mock_mysql_service_proxy_->Begin(client_context, handle); + EXPECT_EQ(s.OK(), true); + mock_mysql_service_proxy_->Execute(client_context, handle, exec_res, + "insert into users (username, email, created_at)" + "values (\"jack\", \"jack@abc.com\", ?)", mtime); + EXPECT_EQ(1, exec_res.GetAffectedRowNum()); + + mock_mysql_service_proxy_->Query(client_context, handle, query_res, "select * from users where username = ?", "jack"); + EXPECT_EQ(1, query_res.ResultSet().size()); + + + handle.GetExecutor()->Close(); + + mock_mysql_service_proxy_->Query(client_context, query_res, "select * from users where username = ?", "jack"); + EXPECT_EQ(0, query_res.ResultSet().size()); +} TEST_F(MysqlServiceProxyTest, AsyncTransaction) { auto client_context = GetClientContext(); @@ -427,9 +447,8 @@ TEST_F(MysqlServiceProxyTest, AsyncTransaction) { // Do query in "Then Chain" and rollback MysqlTime mtime; - mtime.mt.year = 2024; - mtime.mt.month = 9; - mtime.mt.day = 10; + mtime.SetYear(2024).SetMonth(9).SetDay(10); + auto fu4 = mock_mysql_service_proxy_ ->AsyncExecute(client_context, std::move(handle2), "insert into users (username, email, created_at)"