Skip to content

Commit

Permalink
Add CreateThread to avoid manual try-catch (#1302)
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice authored Mar 9, 2023
1 parent dfe5733 commit d48edac
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 68 deletions.
63 changes: 32 additions & 31 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,29 @@
#include "time_util.h"

Status FeedSlaveThread::Start() {
try {
t_ = std::thread([this]() {
Util::ThreadSetName("feed-replica");
sigset_t mask, omask;
sigemptyset(&mask);
sigemptyset(&omask);
sigaddset(&mask, SIGCHLD);
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &mask, &omask);
auto s = Util::SockSend(conn_->GetFD(), "+OK\r\n");
if (!s.IsOK()) {
LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg();
return;
}
this->loop();
});
} catch (const std::system_error &e) {
auto s = Util::CreateThread("feed-replica", [this] {
sigset_t mask, omask;
sigemptyset(&mask);
sigemptyset(&omask);
sigaddset(&mask, SIGCHLD);
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &mask, &omask);
auto s = Util::SockSend(conn_->GetFD(), "+OK\r\n");
if (!s.IsOK()) {
LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg();
return;
}
this->loop();
});

if (s) {
t_ = std::move(*s);
} else {
conn_ = nullptr; // prevent connection was freed when failed to start the thread
return {Status::NotOK, e.what()};
}
return Status::OK();

return s;
}

void FeedSlaveThread::Stop() {
Expand All @@ -76,7 +77,9 @@ void FeedSlaveThread::Stop() {
}

void FeedSlaveThread::Join() {
if (t_.joinable()) t_.join();
if (auto s = Util::ThreadJoin(t_); !s) {
LOG(WARNING) << "Slave thread operation failed: " << s.Msg();
}
}

void FeedSlaveThread::checkLivenessIfNeed() {
Expand Down Expand Up @@ -323,15 +326,11 @@ Status ReplicationThread::Start(std::function<void()> &&pre_fullsync_cb, std::fu
// cleanup the old backups, so we can start replication in a clean state
storage_->PurgeOldBackups(0, 0);

try {
t_ = std::thread([this]() {
Util::ThreadSetName("master-repl");
this->run();
assert(stop_flag_);
});
} catch (const std::system_error &e) {
return Status(Status::NotOK, e.what());
}
t_ = GET_OR_RET(Util::CreateThread("master-repl", [this] {
this->run();
assert(stop_flag_);
}));

return Status::OK();
}

Expand All @@ -340,7 +339,9 @@ void ReplicationThread::Stop() {

stop_flag_ = true; // Stopping procedure is asynchronous,
// handled by timer
if (t_.joinable()) t_.join();
if (auto s = Util::ThreadJoin(t_); !s) {
LOG(WARNING) << "Replication thread operation failed: " << s.Msg();
}
LOG(INFO) << "[replication] Stopped";
}

Expand Down
17 changes: 7 additions & 10 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,17 @@ SlotMigrate::~SlotMigrate() {
stop_migrate_ = true;
thread_state_ = ThreadState::Terminated;
job_cv_.notify_all();
if (t_.joinable()) t_.join();
if (auto s = Util::ThreadJoin(t_); !s) {
LOG(WARNING) << "Slot migrating thread operation failed: " << s.Msg();
}
}
}

Status SlotMigrate::CreateMigrateHandleThread() {
try {
t_ = std::thread([this]() {
Util::ThreadSetName("slot-migrate");
thread_state_ = ThreadState::Running;
this->Loop();
});
} catch (const std::exception &e) {
return {Status::NotOK, std::string(e.what())};
}
t_ = GET_OR_RET(Util::CreateThread("slot-migrate", [this] {
thread_state_ = ThreadState::Running;
this->Loop();
}));

return Status::OK();
}
Expand Down
20 changes: 12 additions & 8 deletions src/commands/cmd_replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ class CommandFetchMeta : public Commander {
svr->stats_.IncrFullSyncCounter();

// Feed-replica-meta thread
std::thread t = std::thread([svr, repl_fd, ip, bev = conn->GetBufferEvent()]() {
Util::ThreadSetName("feed-repl-info");
auto t = GET_OR_RET(Util::CreateThread("feed-repl-info", [svr, repl_fd, ip, bev = conn->GetBufferEvent()] {
svr->IncrFetchFileThread();
auto exit = MakeScopeExit([svr, bev] {
bufferevent_free(bev);
Expand All @@ -243,8 +242,11 @@ class CommandFetchMeta : public Commander {
}
auto now = static_cast<time_t>(Util::GetTimeStamp());
svr->storage_->SetCheckpointAccessTime(now);
});
t.detach();
}));

if (auto s = Util::ThreadDetach(t); !s) {
return s;
}

return Status::OK();
}
Expand All @@ -271,8 +273,7 @@ class CommandFetchFile : public Commander {
conn->NeedNotFreeBufferEvent(); // Feed-replica-file thread will close the replica bufferevent
conn->EnableFlag(Redis::Connection::kCloseAsync);

std::thread t = std::thread([svr, repl_fd, ip, files, bev = conn->GetBufferEvent()]() {
Util::ThreadSetName("feed-repl-file");
auto t = GET_OR_RET(Util::CreateThread("feed-repl-file", [svr, repl_fd, ip, files, bev = conn->GetBufferEvent()]() {
auto exit = MakeScopeExit([bev] { bufferevent_free(bev); });
svr->IncrFetchFileThread();

Expand Down Expand Up @@ -311,8 +312,11 @@ class CommandFetchFile : public Commander {
auto now = static_cast<time_t>(Util::GetTimeStamp());
svr->storage_->SetCheckpointAccessTime(now);
svr->DecrFetchFileThread();
});
t.detach();
}));

if (auto s = Util::ThreadDetach(t); !s) {
return s;
}

return Status::OK();
}
Expand Down
6 changes: 4 additions & 2 deletions src/common/task_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Status TaskRunner::Publish(const Task &task) {
void TaskRunner::Start() {
stop_ = false;
for (int i = 0; i < n_thread_; i++) {
threads_.emplace_back([this]() {
threads_.emplace_back([this] {
Util::ThreadSetName("task-runner");
this->run();
});
Expand All @@ -57,7 +57,9 @@ void TaskRunner::Stop() {

void TaskRunner::Join() {
for (auto &thread : threads_) {
if (thread.joinable()) thread.join();
if (auto s = Util::ThreadJoin(thread); !s) {
LOG(WARNING) << "Task thread operation failed: " << s.Msg();
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/common/thread_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "thread_util.h"

#include <fmt/std.h>
#include <pthread.h>

namespace Util {
Expand All @@ -32,4 +33,19 @@ void ThreadSetName(const char *name) {
#endif
}

template <void (std::thread::*F)(), typename... Args>
Status ThreadOperationImpl(std::thread &t, const char *op, Args &&...args) {
try {
(t.*F)(std::forward<Args>(args)...);
} catch (const std::system_error &e) {
return {Status::NotOK, fmt::format("thread #{} cannot be `{}`ed: {}", t.get_id(), op, e.what())};
}

return Status::OK();
}

Status ThreadJoin(std::thread &t) { return ThreadOperationImpl<&std::thread::join>(t, "join"); }

Status ThreadDetach(std::thread &t) { return ThreadOperationImpl<&std::thread::detach>(t, "detach"); }

} // namespace Util
21 changes: 21 additions & 0 deletions src/common/thread_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,29 @@

#pragma once

#include <system_error>
#include <thread>

#include "fmt/core.h"
#include "status.h"

namespace Util {

void ThreadSetName(const char *name);

template <typename F>
StatusOr<std::thread> CreateThread(const char *name, F f) {
try {
return std::thread([name, f = std::move(f)] {
ThreadSetName(name);
f();
});
} catch (const std::system_error &e) {
return {Status::NotOK, fmt::format("thread '{}' cannot be started: {}", name, e.what())};
}
}

Status ThreadJoin(std::thread &t);
Status ThreadDetach(std::thread &t);

} // namespace Util
18 changes: 9 additions & 9 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,11 @@ Status Server::Start() {

task_runner_.Start();
// setup server cron thread
cron_thread_ = std::thread([this]() {
Util::ThreadSetName("server-cron");
this->cron();
});
cron_thread_ = GET_OR_RET(Util::CreateThread("server-cron", [this] { this->cron(); }));

compaction_checker_thread_ = std::thread([this]() {
compaction_checker_thread_ = GET_OR_RET(Util::CreateThread("compact-check", [this] {
uint64_t counter = 0;
time_t last_compact_date = 0;
Util::ThreadSetName("compact-check");
CompactionChecker compaction_checker(this->storage_);

while (!stop_) {
Expand Down Expand Up @@ -204,7 +200,7 @@ Status Server::Start() {
}
}
}
});
}));

memory_startup_use_.store(Stats::GetMemoryRSS(), std::memory_order_relaxed);
LOG(INFO) << "[server] Ready to accept connections";
Expand Down Expand Up @@ -233,8 +229,12 @@ void Server::Join() {
}

task_runner_.Join();
if (cron_thread_.joinable()) cron_thread_.join();
if (compaction_checker_thread_.joinable()) compaction_checker_thread_.join();
if (auto s = Util::ThreadJoin(cron_thread_); !s) {
LOG(WARNING) << "Cron thread operation failed: " << s.Msg();
}
if (auto s = Util::ThreadJoin(compaction_checker_thread_); !s) {
LOG(WARNING) << "Compaction checker thread operation failed: " << s.Msg();
}
}

Status Server::AddMaster(const std::string &host, uint32_t port, bool force_reconnect) {
Expand Down
18 changes: 10 additions & 8 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -518,20 +518,22 @@ void Worker::KickoutIdleClients(int timeout) {
}

void WorkerThread::Start() {
try {
t_ = std::thread([this]() {
Util::ThreadSetName("worker");
this->worker_->Run(std::this_thread::get_id());
});
} catch (const std::system_error &e) {
LOG(ERROR) << "[worker] Failed to start worker thread, err: " << e.what();
auto s = Util::CreateThread("worker", [this] { this->worker_->Run(std::this_thread::get_id()); });

if (s) {
t_ = std::move(*s);
} else {
LOG(ERROR) << "[worker] Failed to start worker thread, err: " << s.Msg();
return;
}

LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started";
}

void WorkerThread::Stop() { worker_->Stop(); }

void WorkerThread::Join() {
if (t_.joinable()) t_.join();
if (auto s = Util::ThreadJoin(t_); !s) {
LOG(WARNING) << "[worker] " << s.Msg();
}
}

0 comments on commit d48edac

Please sign in to comment.