diff --git a/conf/pika.conf b/conf/pika.conf index 1396caf5e5..3fcb5d5158 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -27,6 +27,11 @@ thread-num : 1 # are dedicated to handling user requests. thread-pool-size : 12 +# This parameter is used to control whether to separate fast and slow commands. +# When slow-cmd-pool is set to yes, fast and slow commands are separated. +# When set to no, they are not separated. +slow-cmd-pool : no + # Size of the low level thread pool, The threads within this pool # are dedicated to handling slow user requests. slow-cmd-thread-pool-size : 1 diff --git a/include/pika_client_processor.h b/include/pika_client_processor.h index a2c628394e..dccd4ef96c 100644 --- a/include/pika_client_processor.h +++ b/include/pika_client_processor.h @@ -19,12 +19,10 @@ class PikaClientProcessor { int Start(); void Stop(); void SchedulePool(net::TaskFunc func, void* arg); - void ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str); size_t ThreadPoolCurQueueSize(); size_t ThreadPoolMaxQueueSize(); private: std::unique_ptr pool_; - std::vector> bg_threads_; }; #endif // PIKA_CLIENT_PROCESSOR_H_ diff --git a/include/pika_conf.h b/include/pika_conf.h index d55b45e027..e93a5e7e5b 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -186,6 +186,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return slotmigrate_; } + bool slow_cmd_pool() { + std::shared_lock l(rwlock_); + return slow_cmd_pool_; + } std::string server_id() { std::shared_lock l(rwlock_); return server_id_; @@ -584,6 +588,11 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("slotmigrate", value ? "yes" : "no"); slotmigrate_.store(value); } + void SetSlowCmdPool(const bool value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("slow-cmd-pool", value ? "yes" : "no"); + slow_cmd_pool_.store(value); + } void SetSlotMigrateThreadNum(const int value) { std::lock_guard l(rwlock_); TryPushDiffCommands("slotmigrate-thread-num", std::to_string(value)); @@ -872,6 +881,7 @@ class PikaConf : public pstd::BaseConf { std::string bgsave_path_; std::string bgsave_prefix_; std::string pidfile_; + std::atomic slow_cmd_pool_; std::string compression_; std::string compression_per_level_; diff --git a/include/pika_repl_bgworker.h b/include/pika_repl_bgworker.h index 2401d72009..e9d6a1b034 100644 --- a/include/pika_repl_bgworker.h +++ b/include/pika_repl_bgworker.h @@ -28,6 +28,9 @@ class PikaReplBgWorker { void QueueClear(); static void HandleBGWorkerWriteBinlog(void* arg); static void HandleBGWorkerWriteDB(void* arg); + void SetThreadName(const std::string& thread_name) { + bg_thread_.set_thread_name(thread_name); + } BinlogItem binlog_item_; net::RedisParser redis_parser_; diff --git a/include/pika_server.h b/include/pika_server.h index 4811c54045..480ba5c17e 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -97,6 +97,7 @@ class PikaServer : public pstd::noncopyable { bool force_full_sync(); void SetForceFullSync(bool v); void SetDispatchQueueLimit(int queue_limit); + void SetSlowCmdThreadPoolFlag(bool flag); storage::StorageOptions storage_options(); std::unique_ptr& pika_dispatch_thread() { return pika_dispatch_thread_; @@ -170,7 +171,6 @@ class PikaServer : public pstd::noncopyable { void FinishMetaSync(); bool MetaSyncDone(); void ResetMetaSyncStatus(); - void SetLoopDBStateMachine(bool need_loop); int GetMetaSyncTimestamp(); void UpdateMetaSyncTimestamp(); void UpdateMetaSyncTimestampWithoutLock(); @@ -181,7 +181,7 @@ class PikaServer : public pstd::noncopyable { * PikaClientProcessor Process Task */ void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd); - void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str); + // for info debug size_t ClientProcessorThreadPoolCurQueueSize(); size_t ClientProcessorThreadPoolMaxQueueSize(); @@ -644,6 +644,11 @@ class PikaServer : public pstd::noncopyable { * acl */ std::unique_ptr<::Acl> acl_ = nullptr; + + /* + * fast and slow thread pools + */ + bool slow_cmd_thread_pool_flag_; }; #endif diff --git a/src/net/include/backend_thread.h b/src/net/include/backend_thread.h index 6e39583014..b374ec86c6 100644 --- a/src/net/include/backend_thread.h +++ b/src/net/include/backend_thread.h @@ -110,6 +110,7 @@ class BackendThread : public Thread { */ int StartThread() override; int StopThread() override; + void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); } pstd::Status Write(int fd, const std::string& msg); pstd::Status Close(int fd); // Try to connect fd noblock, if return EINPROGRESS or EAGAIN or EWOULDBLOCK diff --git a/src/net/include/client_thread.h b/src/net/include/client_thread.h index 25846555c2..c57174724d 100644 --- a/src/net/include/client_thread.h +++ b/src/net/include/client_thread.h @@ -110,6 +110,7 @@ class ClientThread : public Thread { */ int StartThread() override; int StopThread() override; + void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); } pstd::Status Write(const std::string& ip, int port, const std::string& msg); pstd::Status Close(const std::string& ip, int port); diff --git a/src/net/include/net_thread.h b/src/net/include/net_thread.h index ac700819a5..ff96811e91 100644 --- a/src/net/include/net_thread.h +++ b/src/net/include/net_thread.h @@ -34,7 +34,7 @@ class Thread : public pstd::noncopyable { std::string thread_name() const { return thread_name_; } - void set_thread_name(const std::string& name) { thread_name_ = name; } + virtual void set_thread_name(const std::string& name) { thread_name_ = name; } protected: std::atomic_bool should_stop_; diff --git a/src/net/include/server_thread.h b/src/net/include/server_thread.h index d0d6d63612..b8defbf2a6 100644 --- a/src/net/include/server_thread.h +++ b/src/net/include/server_thread.h @@ -150,6 +150,8 @@ class ServerThread : public Thread { // Move into server thread virtual void MoveConnIn(std::shared_ptr conn, const NotifyType& type) = 0; + void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); } + virtual void KillAllConns() = 0; virtual bool KillConn(const std::string& ip_port) = 0; diff --git a/src/net/src/backend_thread.cc b/src/net/src/backend_thread.cc index b0eaa53687..27389293d7 100644 --- a/src/net/src/backend_thread.cc +++ b/src/net/src/backend_thread.cc @@ -48,6 +48,8 @@ int BackendThread::StartThread() { if (res) { return res; } + set_thread_name("BackendThread"); + return Thread::StartThread(); } diff --git a/src/net/src/client_thread.cc b/src/net/src/client_thread.cc index 916fd8f6ee..5561d6d3c0 100644 --- a/src/net/src/client_thread.cc +++ b/src/net/src/client_thread.cc @@ -47,6 +47,8 @@ int ClientThread::StartThread() { if (res) { return res; } + set_thread_name("ClientThread"); + return Thread::StartThread(); } diff --git a/src/net/src/dispatch_thread.cc b/src/net/src/dispatch_thread.cc index d98c44b68b..922688c178 100644 --- a/src/net/src/dispatch_thread.cc +++ b/src/net/src/dispatch_thread.cc @@ -66,7 +66,7 @@ int DispatchThread::StartThread() { // Adding timer tasks and run timertaskThread timerTaskThread_.AddTimerTask("blrpop_blocking_info_scan", 250, true, [this] { this->ScanExpiredBlockedConnsOfBlrpop(); }); - + timerTaskThread_.set_thread_name("TimerTaskThread"); timerTaskThread_.StartThread(); return ServerThread::StartThread(); } diff --git a/src/net/src/holy_thread.h b/src/net/src/holy_thread.h index 0b4f0d700b..312de4c84f 100644 --- a/src/net/src/holy_thread.h +++ b/src/net/src/holy_thread.h @@ -35,6 +35,8 @@ class HolyThread : public ServerThread { int StopThread() override; + void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); } + void set_keepalive_timeout(int timeout) override { keepalive_timeout_ = timeout; } int conn_num() const override; diff --git a/src/net/src/net_thread_name.h b/src/net/src/net_thread_name.h index e85cd1a6df..5d8dc78db8 100644 --- a/src/net/src/net_thread_name.h +++ b/src/net/src/net_thread_name.h @@ -26,7 +26,7 @@ inline bool SetThreadName(pthread_t id, const std::string& name) { #else inline bool SetThreadName(pthread_t id, const std::string& name) { // printf ("no pthread_setname\n"); - return false; + return pthread_setname_np(name.c_str()) == 0; } #endif } // namespace net diff --git a/src/net/src/net_util.cc b/src/net/src/net_util.cc index 6f1f4692d0..7efbb0f6cd 100644 --- a/src/net/src/net_util.cc +++ b/src/net/src/net_util.cc @@ -126,6 +126,7 @@ int TimerTaskThread::StartThread() { // if there is no timer task registered, no need of start the thread return -1; } + set_thread_name("TimerTask"); LOG(INFO) << "TimerTaskThread Starting..."; return Thread::StartThread(); } diff --git a/src/net/src/net_util.h b/src/net/src/net_util.h index a6fcbdc932..fe96e0a950 100644 --- a/src/net/src/net_util.h +++ b/src/net/src/net_util.h @@ -80,6 +80,7 @@ class TimerTaskThread : public Thread { ~TimerTaskThread() override; int StartThread() override; int StopThread() override; + void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); } uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function &task){ return timer_task_manager_.AddTimerTask(task_name, interval_ms, repeat_exec, task); diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index 4ea4b82125..8e20694244 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -8,6 +8,7 @@ #include +#include #include namespace net { @@ -24,7 +25,8 @@ int ThreadPool::Worker::start() { return -1; } else { start_.store(true); - SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "Worker"); + std::string thread_id_str = std::to_string(reinterpret_cast(thread_id_)); + SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "_Worker_" + thread_id_str); } } return 0; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index bb52159dd6..c47a90649b 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1602,6 +1602,12 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, g_pika_conf->slotmigrate() ? "yes" : "no"); } + if (pstd::stringmatch(pattern.data(), "slow-cmd-pool", 1)) { + elements += 2; + EncodeString(&config_body, "slow-cmd-pool"); + EncodeString(&config_body, g_pika_conf->slow_cmd_pool() ? "yes" : "no"); + } + if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)!= 0) { elements += 2; EncodeString(&config_body, "slotmigrate-thread-num"); @@ -2143,6 +2149,7 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { "requirepass", "masterauth", "slotmigrate", + "slow-cmd-pool", "slotmigrate-thread-num", "thread-migrate-keys-num", "userpass", @@ -2302,6 +2309,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { } g_pika_conf->SetSlotMigrate(slotmigrate); res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "slow_cmd_pool") { + bool SlowCmdPool; + if (value == "yes") { + SlowCmdPool = true; + } else if (value == "no") { + SlowCmdPool = false; + } else { + res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slow-cmd-pool'\r\n"); + return; + } + g_pika_conf->SetSlowCmdPool(SlowCmdPool); + g_pika_server->SetSlowCmdThreadPoolFlag(SlowCmdPool); + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "slowlog-log-slower-than") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n"); diff --git a/src/pika_client_processor.cc b/src/pika_client_processor.cc index 8a26ccd4a4..5a1c60cee0 100644 --- a/src/pika_client_processor.cc +++ b/src/pika_client_processor.cc @@ -9,10 +9,6 @@ PikaClientProcessor::PikaClientProcessor(size_t worker_num, size_t max_queue_size, const std::string& name_prefix) { pool_ = std::make_unique(worker_num, max_queue_size, name_prefix + "Pool"); - for (size_t i = 0; i < worker_num; ++i) { - bg_threads_.push_back(std::make_unique(max_queue_size)); - bg_threads_.back()->set_thread_name(name_prefix + "BgThread"); - } } PikaClientProcessor::~PikaClientProcessor() { @@ -24,29 +20,15 @@ int PikaClientProcessor::Start() { if (res != net::kSuccess) { return res; } - for (auto& bg_thread : bg_threads_) { - res = bg_thread->StartThread(); - if (res != net::kSuccess) { - return res; - } - } return res; } void PikaClientProcessor::Stop() { pool_->stop_thread_pool(); - for (auto & bg_thread : bg_threads_) { - bg_thread->StopThread(); - } } void PikaClientProcessor::SchedulePool(net::TaskFunc func, void* arg) { pool_->Schedule(func, arg); } -void PikaClientProcessor::ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str) { - std::size_t index = std::hash{}(hash_str) % bg_threads_.size(); - bg_threads_[index]->Schedule(func, arg); -} - size_t PikaClientProcessor::ThreadPoolCurQueueSize() { size_t cur_size = 0; if (pool_) { diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 4ca6710b60..3d54e3e895 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -66,6 +66,11 @@ int PikaConf::Load() { GetConfStr("slotmigrate", &smgrt); slotmigrate_.store(smgrt == "yes" ? true : false); + // slow cmd thread pool + std::string slowcmdpool; + GetConfStr("slow-cmd-pool", &slowcmdpool); + slow_cmd_pool_.store(slowcmdpool == "yes" ? true : false); + int binlog_writer_num = 1; GetConfInt("binlog-writer-num", &binlog_writer_num); if (binlog_writer_num <= 0 || binlog_writer_num > 24) { @@ -154,11 +159,11 @@ int PikaConf::Load() { } GetConfInt("slow-cmd-thread-pool-size", &slow_cmd_thread_pool_size_); - if (slow_cmd_thread_pool_size_ <= 0) { - slow_cmd_thread_pool_size_ = 12; + if (slow_cmd_thread_pool_size_ < 0) { + slow_cmd_thread_pool_size_ = 8; } - if (slow_cmd_thread_pool_size_ > 100) { - slow_cmd_thread_pool_size_ = 100; + if (slow_cmd_thread_pool_size_ > 50) { + slow_cmd_thread_pool_size_ = 50; } std::string slow_cmd_list; diff --git a/src/pika_repl_client.cc b/src/pika_repl_client.cc index 352fbdf7e5..2d53be265c 100644 --- a/src/pika_repl_client.cc +++ b/src/pika_repl_client.cc @@ -28,10 +28,16 @@ PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) { client_thread_ = std::make_unique(cron_interval, keepalive_timeout); client_thread_->set_thread_name("PikaReplClient"); for (int i = 0; i < g_pika_conf->sync_binlog_thread_num(); i++) { - write_binlog_workers_.emplace_back(std::make_unique(PIKA_SYNC_BUFFER_SIZE)); + auto new_binlog_worker = std::make_unique(PIKA_SYNC_BUFFER_SIZE); + std::string binlog_worker_name = "ReplBinlogWorker" + std::to_string(i); + new_binlog_worker->SetThreadName(binlog_worker_name); + write_binlog_workers_.emplace_back(std::move(new_binlog_worker)); } for (int i = 0; i < g_pika_conf->sync_thread_num(); ++i) { - write_db_workers_.emplace_back(std::make_unique(PIKA_SYNC_BUFFER_SIZE)); + auto new_db_worker = std::make_unique(PIKA_SYNC_BUFFER_SIZE); + std::string db_worker_name = "ReplWriteDBWorker" + std::to_string(i); + new_db_worker->SetThreadName(db_worker_name); + write_db_workers_.emplace_back(std::move(new_db_worker)); } } diff --git a/src/pika_repl_server.cc b/src/pika_repl_server.cc index a99fc18047..b92d239b18 100644 --- a/src/pika_repl_server.cc +++ b/src/pika_repl_server.cc @@ -17,7 +17,7 @@ extern PikaServer* g_pika_server; extern std::unique_ptr g_pika_rm; PikaReplServer::PikaReplServer(const std::set& ips, int port, int cron_interval) { - server_tp_ = std::make_unique(PIKA_REPL_SERVER_TP_SIZE, 100000); + server_tp_ = std::make_unique(PIKA_REPL_SERVER_TP_SIZE, 100000, "PikaReplServer"); pika_repl_server_thread_ = std::make_unique(ips, port, cron_interval); pika_repl_server_thread_->set_thread_name("PikaReplServer"); } @@ -27,6 +27,7 @@ PikaReplServer::~PikaReplServer() { } int PikaReplServer::Start() { + pika_repl_server_thread_->set_thread_name("PikaReplServer"); int res = pika_repl_server_thread_->StartThread(); if (res != net::kSuccess) { LOG(FATAL) << "Start Pika Repl Server Thread Error: " << res diff --git a/src/pika_server.cc b/src/pika_server.cc index 5c3aae16df..eaa73e5749 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -43,6 +43,7 @@ void DoPurgeDir(void* arg) { PikaServer::PikaServer() : exit_(false), + slow_cmd_thread_pool_flag_(g_pika_conf->slow_cmd_pool()), last_check_compact_time_({0, 0}), last_check_resume_time_({0, 0}), repl_state_(PIKA_REPL_NO_CONNECT), @@ -100,6 +101,7 @@ PikaServer::PikaServer() } acl_ = std::make_unique<::Acl>(); + SetSlowCmdThreadPoolFlag(g_pika_conf->slow_cmd_pool()); } PikaServer::~PikaServer() { @@ -166,12 +168,6 @@ void PikaServer::Start() { LOG(FATAL) << "Start PikaClientProcessor Error: " << ret << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); } - ret = pika_slow_cmd_thread_pool_->start_thread_pool(); - if (ret != net::kSuccess) { - dbs_.clear(); - LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret - << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); - } ret = pika_dispatch_thread_->StartThread(); if (ret != net::kSuccess) { dbs_.clear(); @@ -205,6 +201,24 @@ void PikaServer::Start() { LOG(INFO) << "Goodbye..."; } +void PikaServer::SetSlowCmdThreadPoolFlag(bool flag) { + slow_cmd_thread_pool_flag_ = flag; + int ret = 0; + if (flag) { + ret = pika_slow_cmd_thread_pool_->start_thread_pool(); + if (ret != net::kSuccess) { + dbs_.clear(); + LOG(ERROR) << "Start PikaLowLevelThreadPool Error: " << ret + << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); + } + } else { + while (SlowCmdThreadPoolCurQueueSize() != 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + pika_slow_cmd_thread_pool_->stop_thread_pool(); + } +} + void PikaServer::Exit() { g_pika_server->DisableCompact(); exit_mutex_.unlock(); @@ -707,17 +721,13 @@ void PikaServer::SetFirstMetaSync(bool v) { } void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd) { - if (is_slow_cmd) { + if (is_slow_cmd && g_pika_conf->slow_cmd_pool()) { pika_slow_cmd_thread_pool_->Schedule(func, arg); return; } pika_client_processor_->SchedulePool(func, arg); } -void PikaServer::ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str) { - pika_client_processor_->ScheduleBgThreads(func, arg, hash_str); -} - size_t PikaServer::ClientProcessorThreadPoolCurQueueSize() { if (!pika_client_processor_) { return 0; @@ -749,11 +759,13 @@ size_t PikaServer::SlowCmdThreadPoolMaxQueueSize() { } void PikaServer::BGSaveTaskSchedule(net::TaskFunc func, void* arg) { + bgsave_thread_.set_thread_name("BGSaveTask"); bgsave_thread_.StartThread(); bgsave_thread_.Schedule(func, arg); } void PikaServer::PurgelogsTaskSchedule(net::TaskFunc func, void* arg) { + purge_thread_.set_thread_name("PurgelogsTask"); purge_thread_.StartThread(); purge_thread_.Schedule(func, arg); } @@ -764,6 +776,7 @@ void PikaServer::PurgeDir(const std::string& path) { } void PikaServer::PurgeDirTaskSchedule(void (*function)(void*), void* arg) { + purge_thread_.set_thread_name("PurgeDirTask"); purge_thread_.StartThread(); purge_thread_.Schedule(function, arg); } @@ -814,6 +827,7 @@ void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& d } void PikaServer::KeyScanTaskSchedule(net::TaskFunc func, void* arg) { + key_scan_thread_.set_thread_name("KeyScanTask"); key_scan_thread_.StartThread(); key_scan_thread_.Schedule(func, arg); } @@ -1453,6 +1467,7 @@ void PikaServer::Bgslotsreload(const std::shared_ptr& db) { LOG(INFO) << "Start slot reloading"; // Start new thread if needed + bgsave_thread_.set_thread_name("SlotsReload"); bgsave_thread_.StartThread(); bgsave_thread_.Schedule(&DoBgslotsreload, static_cast(this)); } @@ -1520,6 +1535,7 @@ void PikaServer::Bgslotscleanup(std::vector cleanupSlots, const std::shared LOG(INFO) << "Start slot cleanup, slots: " << slotsStr << std::endl; // Start new thread if needed + bgslots_cleanup_thread_.set_thread_name("SlotsCleanup"); bgslots_cleanup_thread_.StartThread(); bgslots_cleanup_thread_.Schedule(&DoBgslotscleanup, static_cast(this)); } @@ -1624,7 +1640,7 @@ void DoBgslotscleanup(void* arg) { void PikaServer::ResetCacheAsync(uint32_t cache_num, std::shared_ptr db, cache::CacheConfig *cache_cfg) { if (PIKA_CACHE_STATUS_OK == db->cache()->CacheStatus() || PIKA_CACHE_STATUS_NONE == db->cache()->CacheStatus()) { - + common_bg_thread_.set_thread_name("ResetCacheTask"); common_bg_thread_.StartThread(); BGCacheTaskArg *arg = new BGCacheTaskArg(); arg->db = db; @@ -1648,7 +1664,7 @@ void PikaServer::ClearCacheDbAsync(std::shared_ptr db) { LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus(); return; } - + common_bg_thread_.set_thread_name("CacheClearThread"); common_bg_thread_.StartThread(); BGCacheTaskArg *arg = new BGCacheTaskArg(); arg->db = db; @@ -1716,7 +1732,7 @@ void PikaServer::ClearCacheDbAsyncV2(std::shared_ptr db) { LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus(); return; } - + common_bg_thread_.set_thread_name("V2CacheClearThread"); common_bg_thread_.StartThread(); BGCacheTaskArg *arg = new BGCacheTaskArg(); arg->db = db; diff --git a/src/rsync_client.cc b/src/rsync_client.cc index 0cf683ba75..7def7cbadc 100644 --- a/src/rsync_client.cc +++ b/src/rsync_client.cc @@ -28,6 +28,7 @@ RsyncClient::RsyncClient(const std::string& dir, const std::string& db_name) parallel_num_(g_pika_conf->max_rsync_parallel_num()) { wo_mgr_.reset(new WaitObjectManager()); client_thread_ = std::make_unique(3000, 60, wo_mgr_.get()); + client_thread_->set_thread_name("RsyncClientThread"); work_threads_.resize(GetParallelNum()); finished_work_cnt_.store(0); } diff --git a/src/rsync_server.cc b/src/rsync_server.cc index ea339af59c..5696719980 100644 --- a/src/rsync_server.cc +++ b/src/rsync_server.cc @@ -31,7 +31,7 @@ void RsyncWriteResp(RsyncService::RsyncResponse& response, std::shared_ptr& ips, const int port) { - work_thread_ = std::make_unique(2, 100000); + work_thread_ = std::make_unique(2, 100000, "RsyncServerWork"); rsync_server_thread_ = std::make_unique(ips, port, 1 * 1000, this); } @@ -46,6 +46,7 @@ void RsyncServer::Schedule(net::TaskFunc func, void* arg) { int RsyncServer::Start() { LOG(INFO) << "start RsyncServer ..."; + rsync_server_thread_->set_thread_name("RsyncServerThread"); int res = rsync_server_thread_->StartThread(); if (res != net::kSuccess) { LOG(FATAL) << "Start rsync Server Thread Error. ret_code: " << res << " message: "