Skip to content

Commit

Permalink
remove libmysqlclient unnecessary dependency; fixed MysqlServiceProxy…
Browse files Browse the repository at this point in the history
… error check
  • Loading branch information
KosmosFult committed Oct 22, 2024
1 parent c907db1 commit 81e9d81
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 91 deletions.
24 changes: 3 additions & 21 deletions cmake/mysqlclient.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,16 @@ if(NOT DEFINED MYSQLCLIENT_VERSION_TAG)
set(MYSQLCLIENT_VERSION_TAG 8.0.39)
endif()

# ================================
# Construct Download URL
# ================================
string(REGEX REPLACE "^([0-9]+\\.[0-9]+)\\..*" "\\1" MYSQLCLIENT_MAJOR_VER "${MYSQLCLIENT_VERSION_TAG}")
set(MYSQLCLIENT_URL "https://dev.mysql.com/get/Downloads/MySQL-${MYSQLCLIENT_MAJOR_VER}/mysql-${MYSQLCLIENT_VERSION_TAG}-linux-glibc2.17-x86_64-minimal.tar.xz")

# ================================
# FetchContent Declaration
# ================================

FetchContent_Declare(
mysqlclient
URL ${MYSQLCLIENT_URL}
SOURCE_DIR ${TRPC_ROOT_PATH}/cmake_third_party/mysqlclient
)

# ================================
# Populate the Content
# ================================
FetchContent_GetProperties(mysqlclient)
if(NOT mysqlclient_POPULATED)
FetchContent_Populate(mysqlclient)
Expand All @@ -34,16 +26,13 @@ set(MYSQLCLIENT_INCLUDE_DIR "${mysqlclient_SOURCE_DIR}/include")
set(MYSQLCLIENT_LIB_DIR "${mysqlclient_SOURCE_DIR}/lib")


# Copy MySQL header files to the desired directory
file(MAKE_DIRECTORY "${MYSQLCLIENT_INCLUDE_DIR}/mysqlclient")
file(GLOB MYSQL_HEADERS "${mysqlclient_SOURCE_DIR}/include/*.h")

# Copy header files to the target include directory
# for #include "mysqlclient/mysql.h", otherwise we will directly #include "mysql.h"
file(COPY ${MYSQL_HEADERS} DESTINATION "${MYSQLCLIENT_INCLUDE_DIR}/mysqlclient")

# ================================
# Define Imported Library
# ================================

add_library(mysqlclient STATIC IMPORTED)

set_target_properties(mysqlclient PROPERTIES
Expand All @@ -54,13 +43,6 @@ set_target_properties(mysqlclient PROPERTIES
target_link_libraries(mysqlclient INTERFACE
"${MYSQLCLIENT_LIB_DIR}/private/libcrypto.so"
"${MYSQLCLIENT_LIB_DIR}/private/libssl.so"
"${MYSQLCLIENT_LIB_DIR}/private/libgssapi_krb5.so"
"${MYSQLCLIENT_LIB_DIR}/private/libkrb5.so"
"${MYSQLCLIENT_LIB_DIR}/private/libk5crypto.so"
"${MYSQLCLIENT_LIB_DIR}/private/libkrb5support.so"
"${MYSQLCLIENT_LIB_DIR}/private/liblber.so"
"${MYSQLCLIENT_LIB_DIR}/private/libldap.so"
"${MYSQLCLIENT_LIB_DIR}/private/libsasl2.so"
)

add_library(trpc_mysqlclient ALIAS mysqlclient)
Expand Down
75 changes: 45 additions & 30 deletions examples/features/mysql/client/fiber/fiber_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ using trpc::mysql::MysqlBlob;

DEFINE_string(client_config, "fiber_client_client_config.yaml", "trpc cpp framework client_config file");


#define MYSQL_ERROR_CHECK(status, res) if(!s.OK()) { \
TRPC_FMT_ERROR("status: {}", s.ToString()); \
return; \
} else if(!res.OK()) { \
TRPC_FMT_ERROR("MySQL error: {}", res.GetErrorMessage()); \
return; \
} \

MysqlBlob GenRandomBlob(std::size_t length) {
std::string random_data;
random_data.reserve(length);
Expand Down Expand Up @@ -93,25 +102,17 @@ void TestQuery(std::shared_ptr<trpc::mysql::MysqlServiceProxy>& proxy) {
MysqlResults<int, std::string> res;
trpc::Status s = proxy->Query(ctx, res, "select id, username from users where id = ? and username = ?", 3, "carol");

if(!s.OK()) {
TRPC_FMT_ERROR("status: {}", s.ToString());
} else if(!res.OK()) {
TRPC_FMT_ERROR("MySQL error: {}", res.GetErrorMessage());
} else {
// const std::vector<std::tuple<int, std::string>>& res_set
auto& res_set = res.ResultSet();
int id = std::get<0>(res_set[0]);
std::string username = std::get<1>(res_set[0]);
std::cout << "id: " << id << ", username: " << username << std::endl;
}
MYSQL_ERROR_CHECK(s, res);
// const std::vector<std::tuple<int, std::string>>& res_set
auto& res_set = res.ResultSet();
int id = std::get<0>(res_set[0]);
std::string username = std::get<1>(res_set[0]);
std::cout << "id: " << id << ", username: " << username << std::endl;

MysqlResults<NativeString> res2;
s = proxy->Query(ctx, res2, "select * from users");

if(!s.OK()) {
TRPC_FMT_ERROR("status: {}", s.ToString());
return;
}
MYSQL_ERROR_CHECK(s, res2);

int col_index = 0;
int row_index = 0;
Expand Down Expand Up @@ -139,27 +140,33 @@ void TestUpdate(std::shared_ptr<trpc::mysql::MysqlServiceProxy>& proxy) {
MysqlTime mtime;
mtime.SetYear(2024).SetMonth(9).SetDay(10);

proxy->Execute(ctx, exec_res,
trpc::Status s = proxy->Execute(ctx, exec_res,
"insert into users (username, email, created_at)"
"values (\"jack\", \"[email protected]\", ?)",
mtime);
MYSQL_ERROR_CHECK(s, exec_res);

if(1 == exec_res.GetAffectedRowNum())
std::cout << "Insert one\n";

ctx = trpc::MakeClientContext(proxy);
proxy->Execute(ctx, query_res, "select email, created_at from users where username = ?",
s = proxy->Execute(ctx, query_res, "select email, created_at from users where username = ?",
"jack");
MYSQL_ERROR_CHECK(s, query_res);
auto& res_vec = query_res.ResultSet();
std::cout << std::get<0>(res_vec[0]) << std::endl;

ctx = trpc::MakeClientContext(proxy);
proxy->Execute(ctx, exec_res, "delete from users where username = \"jack\"");
s = proxy->Execute(ctx, exec_res, "delete from users where username = \"jack\"");
MYSQL_ERROR_CHECK(s, exec_res);
if(1 == exec_res.GetAffectedRowNum())
std::cout << "Delete one\n";

ctx = trpc::MakeClientContext(proxy);
proxy->Execute(ctx, query_res, "select email, created_at from users where username = ?",
s = proxy->Execute(ctx, query_res, "select email, created_at from users where username = ?",
"jack");
MYSQL_ERROR_CHECK(s, query_res);

if(query_res.ResultSet().empty())
std::cout << R"(No user "jack" in users)" << std::endl;
}
Expand All @@ -175,8 +182,13 @@ void TestTime(std::shared_ptr<trpc::mysql::MysqlServiceProxy>& proxy) {
// Use MysqlTime
MysqlResults<MysqlTime> time_res;

proxy->Query(ctx, str_res, "select created_at from users");
proxy->Query(ctx, time_res, "select created_at from users");
trpc::Status s;
s = proxy->Query(ctx, str_res, "select created_at from users");
MYSQL_ERROR_CHECK(s, str_res);


s = proxy->Query(ctx, time_res, "select created_at from users");
MYSQL_ERROR_CHECK(s, time_res);

std::string_view str_time = str_res.ResultSet()[0][0];
MysqlTime my_time = std::get<0>(time_res.ResultSet()[0]);
Expand Down Expand Up @@ -381,16 +393,19 @@ void TestBlob(std::shared_ptr<trpc::mysql::MysqlServiceProxy>& proxy) {

MysqlResults<NativeString> str_res;

proxy->Query(ctx, bind_blob_res, "select meta from users where username = ?", "jack");
s = proxy->Query(ctx, bind_blob_res, "select meta from users where username = ?", "jack");
MYSQL_ERROR_CHECK(s, bind_blob_res);
if(std::get<0>(bind_blob_res.ResultSet()[0]) == blob)
std::cout << "same blob\n";


proxy->Query(ctx, bind_str_res, "select meta from users where username = ?", "jack");
s = proxy->Query(ctx, bind_str_res, "select meta from users where username = ?", "jack");
MYSQL_ERROR_CHECK(s, bind_str_res);
if(std::get<0>(bind_str_res.ResultSet()[0]) == blob.AsStringView())
std::cout << "same blob\n";

proxy->Query(ctx, str_res, "select meta from users where username = ?", "jack");
s = proxy->Query(ctx, str_res, "select meta from users where username = ?", "jack");
MYSQL_ERROR_CHECK(s, str_res);
auto str_view = str_res.ResultSet()[0][0];
if(MysqlBlob(std::string(str_view)) == blob)
std::cout << "same blob\n";
Expand All @@ -399,12 +414,12 @@ void TestBlob(std::shared_ptr<trpc::mysql::MysqlServiceProxy>& proxy) {

int Run() {
auto proxy = ::trpc::GetTrpcClient()->GetProxy<::trpc::mysql::MysqlServiceProxy>("mysql_server");
// TestQuery(proxy);
// TestUpdate(proxy);
// TestCommit(proxy);
// TestRollback(proxy);
// TestError(proxy);
// TestBlob(proxy);
TestQuery(proxy);
TestUpdate(proxy);
TestCommit(proxy);
TestRollback(proxy);
TestError(proxy);
TestBlob(proxy);
TestTime(proxy);
return 0;
}
Expand Down
12 changes: 12 additions & 0 deletions examples/features/mysql/client/future/future_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ void TestAsyncQuery(std::shared_ptr<trpc::mysql::MysqlServiceProxy>& proxy) {
});
std::cout << "do something\n";
trpc::future::BlockingGet(std::move(future));

if(future.IsFailed()) {
TRPC_FMT_ERROR(future.GetException().what());
std::cerr << future.GetException().what() << std::endl;
return;
}
}


Expand Down Expand Up @@ -140,6 +146,12 @@ void TestAsyncTx(std::shared_ptr<trpc::mysql::MysqlServiceProxy>& proxy) {
});

auto fu3 = trpc::future::BlockingGet(std::move(fu2));

if(fu3.IsFailed()) {
TRPC_FMT_ERROR(fu3.GetException().what());
std::cerr << fu3.GetException().what() << std::endl;
return;
}
TransactionHandle handle2(fu3.GetValue0());

// Do query in "Then Chain" and rollback
Expand Down
9 changes: 5 additions & 4 deletions third_party/mysqlclient/mysqlclient.BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ package(

cc_library(
name = "mysqlclient_deps",
# Exclude libprotobuf to avoid conflicts with the existing protobuf in the project.
srcs = glob(["lib/private/*"],
exclude = ["lib/private/libproto*"]),
srcs = [
"lib/private/libcrypto.so.3",
"lib/private/libssl.so.3"
],
visibility = ["//visibility:private"],
)

cc_library(
name = "mysqlclient",
srcs = glob(["lib/libmysqlclient.*"]),
srcs = glob(["lib/libmysqlclient.a"]),
hdrs = glob([
"include/**/*.h",
]),
Expand Down
12 changes: 10 additions & 2 deletions trpc/client/mysql/executor/mysql_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ MysqlExecutor::~MysqlExecutor() {
// Usually it will call Close() before destructor
TRPC_ASSERT(is_connected == false);
Close();
// FreeResult();
}

void MysqlExecutor::Close() {
if (mysql_ != nullptr) {
if (mysql_ != nullptr && is_connected) {
mysql_close(mysql_);
mysql_ = nullptr;
}
Expand Down Expand Up @@ -145,4 +144,13 @@ std::string MysqlExecutor::GetErrorMessage() {
return mysql_error(mysql_);
}

bool MysqlExecutor::Autocommit(bool mode) {
unsigned mode_n = mode ? 1 : 0;
if(mysql_autocommit(mysql_, mode_n) != 0)
return false;

auto_commit_ = mode;
return true;
}

} // namespace trpc::mysql
17 changes: 15 additions & 2 deletions trpc/client/mysql/executor/mysql_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ class MysqlExecutor : public RefCounted<MysqlExecutor> {
///@brief Close the mysql connection and free the MYSQL*.
void Close();

///@brief set auto commit for current session
///@param mode true if enable auto commit else false
///@return true if no error
bool Autocommit(bool mode);

///@brief Executes an SQL query and retrieves all resulting rows, storing each row as a tuple.
///
/// This function executes the provided SQL query with the specified input arguments.
Expand All @@ -129,6 +134,8 @@ class MysqlExecutor : public RefCounted<MysqlExecutor> {
template <typename... InputArgs>
bool Execute(MysqlResults<OnlyExec>& mysql_results, const std::string& query, const InputArgs&... args);

std::string GetErrorMessage();

void RefreshAliveTime();

uint64_t GetAliveTime();
Expand Down Expand Up @@ -184,23 +191,29 @@ class MysqlExecutor : public RefCounted<MysqlExecutor> {
template <typename... OutputArgs>
bool FetchTruncatedResults(MysqlExecutor::QueryHandle<OutputArgs...>& handle);

std::string GetErrorMessage();


private:
/// Just protects the `mysql_init` api
static std::mutex mysql_mutex;

bool is_connected;

bool auto_commit_{true};

MYSQL* mysql_;

uint64_t m_alivetime;

uint64_t executor_id_{0};

std::string hostname_;

std::string username_;

std::string password_;

std::string database_;

uint16_t port_;
};

Expand Down
1 change: 1 addition & 0 deletions trpc/client/mysql/mysql_executor_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ RefPtr<MysqlExecutor> MysqlExecutorPoolImpl::GetOrCreate() {
}

executor = CreateExecutor(shard_id);

if(executor->Connect()) {
executor_num_.fetch_add(1, std::memory_order_relaxed);
return executor;
Expand Down
29 changes: 22 additions & 7 deletions trpc/client/mysql/mysql_service_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,23 @@ Status MysqlServiceProxy::Begin(const ClientContextPtr& context, TransactionHand
MysqlExecutorPool* pool = this->pool_manager_->Get(node_addr);
auto executor = pool->GetExecutor();

Status status;
if(executor == nullptr) {
status.SetFrameworkRetCode(TrpcRetCode::TRPC_CLIENT_CONNECT_ERR);
status.SetErrorMessage("connection failed");
TRPC_FMT_ERROR("service name:{}, connection failed", GetServiceName());
context->SetStatus(std::move(status));
} else {
status = UnaryInvoke(context, executor, res, "begin");
}

Status s = UnaryInvoke(context, executor, res, "begin");

if(!s.OK()) {
context->SetStatus(s);
if(!status.OK()) {
context->SetStatus(status);
} else if(!res.OK()) {
s = kUnknownErrorStatus;
s.SetErrorMessage(res.GetErrorMessage());
context->SetStatus(s);
status = kUnknownErrorStatus;
status.SetErrorMessage(res.GetErrorMessage());
context->SetStatus(status);
} else {
handle.SetExecutor(std::move(executor));
handle.SetState(TransactionHandle::TxState::kStart);
Expand Down Expand Up @@ -149,7 +157,14 @@ Future<TransactionHandle> MysqlServiceProxy::AsyncBegin(const ClientContextPtr &

MysqlExecutorPool* pool = this->pool_manager_->Get(node_addr);
auto executor = pool->GetExecutor();

if(executor == nullptr) {
TRPC_FMT_ERROR("service name:{}, connection failed", GetServiceName());
Status status;
status.SetFrameworkRetCode(TrpcRetCode::TRPC_CLIENT_CONNECT_ERR);
status.SetErrorMessage("connection failed");
context->SetStatus(status);
return MakeExceptionFuture<TransactionHandle>(CommonException("connection failed"));
}

return AsyncUnaryInvoke<OnlyExec>(context, executor, "begin")
.Then([executor](Future<MysqlResults<OnlyExec>>&& f) mutable {
Expand Down
Loading

0 comments on commit 81e9d81

Please sign in to comment.