Skip to content

Commit

Permalink
change thread name
Browse files Browse the repository at this point in the history
  • Loading branch information
brother-jin committed Jun 13, 2024
1 parent d6cb38d commit 013f51a
Show file tree
Hide file tree
Showing 20 changed files with 46 additions and 14 deletions.
3 changes: 3 additions & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class PikaReplBgWorker {
void QueueClear();
static void HandleBGWorkerWriteBinlog(void* arg);
static void HandleBGWorkerWriteDB(void* arg);
void SetThreadName(std::string& thread_name) {
bg_thread_.set_thread_name(thread_name);
}

BinlogItem binlog_item_;
net::RedisParser redis_parser_;
Expand Down
1 change: 1 addition & 0 deletions src/net/include/backend_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class BackendThread : public Thread {
*/
int StartThread() override;
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
pstd::Status Write(int fd, const std::string& msg);
pstd::Status Close(int fd);
// Try to connect fd noblock, if return EINPROGRESS or EAGAIN or EWOULDBLOCK
Expand Down
1 change: 1 addition & 0 deletions src/net/include/client_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class ClientThread : public Thread {
*/
int StartThread() override;
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
pstd::Status Write(const std::string& ip, int port, const std::string& msg);
pstd::Status Close(const std::string& ip, int port);

Expand Down
2 changes: 1 addition & 1 deletion src/net/include/net_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Thread : public pstd::noncopyable {

std::string thread_name() const { return thread_name_; }

void set_thread_name(const std::string& name) { thread_name_ = name; }
virtual void set_thread_name(const std::string& name) { thread_name_ = name; }

protected:
std::atomic_bool should_stop_;
Expand Down
2 changes: 2 additions & 0 deletions src/net/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ class ServerThread : public Thread {
// Move into server thread
virtual void MoveConnIn(std::shared_ptr<NetConn> conn, const NotifyType& type) = 0;

void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }

virtual void KillAllConns() = 0;
virtual bool KillConn(const std::string& ip_port) = 0;

Expand Down
2 changes: 2 additions & 0 deletions src/net/src/backend_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ int BackendThread::StartThread() {
if (res) {
return res;
}
set_thread_name("BackendThread");

return Thread::StartThread();
}

Expand Down
2 changes: 2 additions & 0 deletions src/net/src/client_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ int ClientThread::StartThread() {
if (res) {
return res;
}
set_thread_name("ClientThread");

return Thread::StartThread();
}

Expand Down
2 changes: 1 addition & 1 deletion src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ int DispatchThread::StartThread() {
// Adding timer tasks and run timertaskThread
timerTaskThread_.AddTimerTask("blrpop_blocking_info_scan", 250, true,
[this] { this->ScanExpiredBlockedConnsOfBlrpop(); });

timerTaskThread_.set_thread_name("TimerTaskThread");
timerTaskThread_.StartThread();
return ServerThread::StartThread();
}
Expand Down
1 change: 1 addition & 0 deletions src/net/src/holy_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ int HolyThread::StartThread() {
if (ret) {
return ret;
}
set_thread_name("HolyThread");
return ServerThread::StartThread();
}

Expand Down
2 changes: 2 additions & 0 deletions src/net/src/holy_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class HolyThread : public ServerThread {

int StopThread() override;

void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }

void set_keepalive_timeout(int timeout) override { keepalive_timeout_ = timeout; }

int conn_num() const override;
Expand Down
2 changes: 1 addition & 1 deletion src/net/src/net_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Thread::~Thread() = default;
void* Thread::RunThread(void* arg) {
auto thread = reinterpret_cast<Thread*>(arg);
if (!(thread->thread_name().empty())) {
SetThreadName(pthread_self(), "pika");
SetThreadName(pthread_self(), thread->thread_name());
}
thread->ThreadMain();
return nullptr;
Expand Down
2 changes: 1 addition & 1 deletion src/net/src/net_thread_name.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ inline bool SetThreadName(pthread_t id, const std::string& name) {
#else
inline bool SetThreadName(pthread_t id, const std::string& name) {
// printf ("no pthread_setname\n");
return false;
return pthread_setname_np(name.c_str()) == 0;
}
#endif
} // namespace net
Expand Down
1 change: 1 addition & 0 deletions src/net/src/net_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ int TimerTaskThread::StartThread() {
// if there is no timer task registered, no need of start the thread
return -1;
}
set_thread_name("TimerTask");
LOG(INFO) << "TimerTaskThread Starting...";
return Thread::StartThread();
}
Expand Down
1 change: 1 addition & 0 deletions src/net/src/net_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class TimerTaskThread : public Thread {
~TimerTaskThread() override;
int StartThread() override;
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }

uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task){
return timer_task_manager_.AddTimerTask(task_name, interval_ms, repeat_exec, task);
Expand Down
4 changes: 3 additions & 1 deletion src/net/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <sys/time.h>

#include <string>
#include <utility>

namespace net {
Expand All @@ -24,7 +25,8 @@ int ThreadPool::Worker::start() {
return -1;
} else {
start_.store(true);
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "Worker");
std::string thread_id_str = std::to_string(reinterpret_cast<unsigned long>(thread_id_));
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "_Worker_" + thread_id_str);
}
}
return 0;
Expand Down
10 changes: 8 additions & 2 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
client_thread_ = std::make_unique<PikaReplClientThread>(cron_interval, keepalive_timeout);
client_thread_->set_thread_name("PikaReplClient");
for (int i = 0; i < g_pika_conf->sync_binlog_thread_num(); i++) {
write_binlog_workers_.emplace_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
auto new_binlog_worker = std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE);
std::string binlog_worker_name = "ReplBinlogWorker" + std::to_string(i);
new_binlog_worker->SetThreadName(binlog_worker_name);
write_binlog_workers_.emplace_back(std::move(new_binlog_worker));
}
for (int i = 0; i < g_pika_conf->sync_thread_num(); ++i) {
write_db_workers_.emplace_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
auto new_db_worker = std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE);
std::string db_worker_name = "ReplWriteDBWorker" + std::to_string(i);
new_db_worker->SetThreadName(db_worker_name);
write_db_workers_.emplace_back(std::move(new_db_worker));
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/pika_repl_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ extern PikaServer* g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;

PikaReplServer::PikaReplServer(const std::set<std::string>& ips, int port, int cron_interval) {
server_tp_ = std::make_unique<net::ThreadPool>(PIKA_REPL_SERVER_TP_SIZE, 100000);
server_tp_ = std::make_unique<net::ThreadPool>(PIKA_REPL_SERVER_TP_SIZE, 100000, "PikaReplServer");
pika_repl_server_thread_ = std::make_unique<PikaReplServerThread>(ips, port, cron_interval);
pika_repl_server_thread_->set_thread_name("PikaReplServer");
}
Expand All @@ -27,6 +27,7 @@ PikaReplServer::~PikaReplServer() {
}

int PikaReplServer::Start() {
pika_repl_server_thread_->set_thread_name("PikaReplServer");
int res = pika_repl_server_thread_->StartThread();
if (res != net::kSuccess) {
LOG(FATAL) << "Start Pika Repl Server Thread Error: " << res
Expand Down
14 changes: 10 additions & 4 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void DoPurgeDir(void* arg) {

PikaServer::PikaServer()
: exit_(false),
slow_cmd_thread_pool_flag_(false),
slow_cmd_thread_pool_flag_(g_pika_conf->slow_cmd_pool()),
last_check_compact_time_({0, 0}),
last_check_resume_time_({0, 0}),
repl_state_(PIKA_REPL_NO_CONNECT),
Expand Down Expand Up @@ -759,11 +759,13 @@ size_t PikaServer::SlowCmdThreadPoolMaxQueueSize() {
}

void PikaServer::BGSaveTaskSchedule(net::TaskFunc func, void* arg) {
bgsave_thread_.set_thread_name("BGSaveTask");
bgsave_thread_.StartThread();
bgsave_thread_.Schedule(func, arg);
}

void PikaServer::PurgelogsTaskSchedule(net::TaskFunc func, void* arg) {
purge_thread_.set_thread_name("PurgelogsTask");
purge_thread_.StartThread();
purge_thread_.Schedule(func, arg);
}
Expand All @@ -774,6 +776,7 @@ void PikaServer::PurgeDir(const std::string& path) {
}

void PikaServer::PurgeDirTaskSchedule(void (*function)(void*), void* arg) {
purge_thread_.set_thread_name("PurgeDirTask");
purge_thread_.StartThread();
purge_thread_.Schedule(function, arg);
}
Expand Down Expand Up @@ -824,6 +827,7 @@ void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& d
}

void PikaServer::KeyScanTaskSchedule(net::TaskFunc func, void* arg) {
key_scan_thread_.set_thread_name("KeyScanTask");
key_scan_thread_.StartThread();
key_scan_thread_.Schedule(func, arg);
}
Expand Down Expand Up @@ -1463,6 +1467,7 @@ void PikaServer::Bgslotsreload(const std::shared_ptr<DB>& db) {
LOG(INFO) << "Start slot reloading";

// Start new thread if needed
bgsave_thread_.set_thread_name("SlotsReload");
bgsave_thread_.StartThread();
bgsave_thread_.Schedule(&DoBgslotsreload, static_cast<void*>(this));
}
Expand Down Expand Up @@ -1530,6 +1535,7 @@ void PikaServer::Bgslotscleanup(std::vector<int> cleanupSlots, const std::shared
LOG(INFO) << "Start slot cleanup, slots: " << slotsStr << std::endl;

// Start new thread if needed
bgslots_cleanup_thread_.set_thread_name("SlotsCleanup");
bgslots_cleanup_thread_.StartThread();
bgslots_cleanup_thread_.Schedule(&DoBgslotscleanup, static_cast<void*>(this));
}
Expand Down Expand Up @@ -1634,7 +1640,7 @@ void DoBgslotscleanup(void* arg) {
void PikaServer::ResetCacheAsync(uint32_t cache_num, std::shared_ptr<DB> db, cache::CacheConfig *cache_cfg) {
if (PIKA_CACHE_STATUS_OK == db->cache()->CacheStatus()
|| PIKA_CACHE_STATUS_NONE == db->cache()->CacheStatus()) {

common_bg_thread_.set_thread_name("ResetCacheTask");
common_bg_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->db = db;
Expand All @@ -1658,7 +1664,7 @@ void PikaServer::ClearCacheDbAsync(std::shared_ptr<DB> db) {
LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus();
return;
}

common_bg_thread_.set_thread_name("CacheClearThread");
common_bg_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->db = db;
Expand Down Expand Up @@ -1726,7 +1732,7 @@ void PikaServer::ClearCacheDbAsyncV2(std::shared_ptr<DB> db) {
LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus();
return;
}

common_bg_thread_.set_thread_name("V2CacheClearThread");
common_bg_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->db = db;
Expand Down
1 change: 1 addition & 0 deletions src/rsync_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ bool RsyncClient::Init() {
master_ip_ = g_pika_server->master_ip();
master_port_ = g_pika_server->master_port() + kPortShiftRsync2;
file_set_.clear();
client_thread_->set_thread_name("RsyncClientThread");
client_thread_->StartThread();
bool ret = ComparisonUpdate();
if (!ret) {
Expand Down
4 changes: 2 additions & 2 deletions src/rsync_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ void RsyncWriteResp(RsyncService::RsyncResponse& response, std::shared_ptr<net::
}

RsyncServer::RsyncServer(const std::set<std::string>& ips, const int port) {
work_thread_ = std::make_unique<net::ThreadPool>(2, 100000);
work_thread_ = std::make_unique<net::ThreadPool>(2, 100000, "RsyncServerWork");
rsync_server_thread_ = std::make_unique<RsyncServerThread>(ips, port, 1 * 1000, this);
rsync_server_thread_->set_thread_name("RsyncServer");
}

RsyncServer::~RsyncServer() {
Expand All @@ -47,6 +46,7 @@ void RsyncServer::Schedule(net::TaskFunc func, void* arg) {

int RsyncServer::Start() {
LOG(INFO) << "start RsyncServer ...";
rsync_server_thread_->set_thread_name("RsyncServerThread");
int res = rsync_server_thread_->StartThread();
if (res != net::kSuccess) {
LOG(FATAL) << "Start rsync Server Thread Error. ret_code: " << res << " message: "
Expand Down

0 comments on commit 013f51a

Please sign in to comment.