diff --git a/conf/pika.conf b/conf/pika.conf index a99c30b30d..ea349c1ff1 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -27,6 +27,11 @@ thread-num : 1 # are dedicated to handling user requests. thread-pool-size : 12 +# This parameter is used to control whether to separate fast and slow commands. +# When slow-cmd-pool is set to yes, fast and slow commands are separated. +# When set to no, they are not separated. +slow-cmd-pool : no + # Size of the low level thread pool, The threads within this pool # are dedicated to handling slow user requests. slow-cmd-thread-pool-size : 1 diff --git a/include/pika_client_processor.h b/include/pika_client_processor.h index a2c628394e..dccd4ef96c 100644 --- a/include/pika_client_processor.h +++ b/include/pika_client_processor.h @@ -19,12 +19,10 @@ class PikaClientProcessor { int Start(); void Stop(); void SchedulePool(net::TaskFunc func, void* arg); - void ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str); size_t ThreadPoolCurQueueSize(); size_t ThreadPoolMaxQueueSize(); private: std::unique_ptr pool_; - std::vector> bg_threads_; }; #endif // PIKA_CLIENT_PROCESSOR_H_ diff --git a/include/pika_conf.h b/include/pika_conf.h index e0cb81062d..ab7ecf4982 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -186,6 +186,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return slotmigrate_; } + bool slow_cmd_pool() { + std::shared_lock l(rwlock_); + return slow_cmd_pool_; + } std::string server_id() { std::shared_lock l(rwlock_); return server_id_; @@ -574,6 +578,11 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("slotmigrate", value ? "yes" : "no"); slotmigrate_.store(value); } + void SetSlowCmdPool(const bool value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("slow-cmd-pool", value ? "yes" : "no"); + slow_cmd_pool_.store(value); + } void SetSlotMigrateThreadNum(const int value) { std::lock_guard l(rwlock_); TryPushDiffCommands("slotmigrate-thread-num", std::to_string(value)); @@ -844,6 +853,7 @@ class PikaConf : public pstd::BaseConf { std::string bgsave_path_; std::string bgsave_prefix_; std::string pidfile_; + std::atomic slow_cmd_pool_; std::string compression_; std::string compression_per_level_; diff --git a/include/pika_server.h b/include/pika_server.h index 2e26678d05..003d819c13 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -95,6 +95,7 @@ class PikaServer : public pstd::noncopyable { bool force_full_sync(); void SetForceFullSync(bool v); void SetDispatchQueueLimit(int queue_limit); + void SetSlowCmdThreadPoolFlag(bool flag); storage::StorageOptions storage_options(); std::unique_ptr& pika_dispatch_thread() { return pika_dispatch_thread_; @@ -168,7 +169,6 @@ class PikaServer : public pstd::noncopyable { void FinishMetaSync(); bool MetaSyncDone(); void ResetMetaSyncStatus(); - void SetLoopDBStateMachine(bool need_loop); int GetMetaSyncTimestamp(); void UpdateMetaSyncTimestamp(); void UpdateMetaSyncTimestampWithoutLock(); @@ -179,7 +179,7 @@ class PikaServer : public pstd::noncopyable { * PikaClientProcessor Process Task */ void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd); - void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str); + // for info debug size_t ClientProcessorThreadPoolCurQueueSize(); size_t ClientProcessorThreadPoolMaxQueueSize(); @@ -642,6 +642,11 @@ class PikaServer : public pstd::noncopyable { * acl */ std::unique_ptr<::Acl> acl_ = nullptr; + + /* + * fast and slow thread pools + */ + bool slow_cmd_thread_pool_flag_; }; #endif diff --git a/src/net/src/net_thread.cc b/src/net/src/net_thread.cc index a6a7b08994..f193fd7ddc 100644 --- a/src/net/src/net_thread.cc +++ b/src/net/src/net_thread.cc @@ -17,7 +17,7 @@ Thread::~Thread() = default; void* Thread::RunThread(void* arg) { auto thread = reinterpret_cast(arg); if (!(thread->thread_name().empty())) { - SetThreadName(pthread_self(), thread->thread_name()); + SetThreadName(pthread_self(), "pika"); } thread->ThreadMain(); return nullptr; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 2996d3a005..c3e038d695 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1600,6 +1600,12 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, g_pika_conf->slotmigrate() ? "yes" : "no"); } + if (pstd::stringmatch(pattern.data(), "slow-cmd-pool", 1)) { + elements += 2; + EncodeString(&config_body, "slow-cmd-pool"); + EncodeString(&config_body, g_pika_conf->slow_cmd_pool() ? "yes" : "no"); + } + if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)!= 0) { elements += 2; EncodeString(&config_body, "slotmigrate-thread-num"); @@ -2129,6 +2135,7 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { "requirepass", "masterauth", "slotmigrate", + "slow-cmd-pool", "slotmigrate-thread-num", "thread-migrate-keys-num", "userpass", @@ -2288,6 +2295,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { } g_pika_conf->SetSlotMigrate(slotmigrate); res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "slow_cmd_pool") { + bool SlowCmdPool; + if (value == "yes") { + SlowCmdPool = true; + g_pika_server->SetSlowCmdThreadPoolFlag(SlowCmdPool); + } else if (value == "no") { + SlowCmdPool = false; + } else { + res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slow-cmd-pool'\r\n"); + return; + } + g_pika_conf->SetSlowCmdPool(SlowCmdPool); + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "slowlog-log-slower-than") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n"); diff --git a/src/pika_client_processor.cc b/src/pika_client_processor.cc index 8a26ccd4a4..5a1c60cee0 100644 --- a/src/pika_client_processor.cc +++ b/src/pika_client_processor.cc @@ -9,10 +9,6 @@ PikaClientProcessor::PikaClientProcessor(size_t worker_num, size_t max_queue_size, const std::string& name_prefix) { pool_ = std::make_unique(worker_num, max_queue_size, name_prefix + "Pool"); - for (size_t i = 0; i < worker_num; ++i) { - bg_threads_.push_back(std::make_unique(max_queue_size)); - bg_threads_.back()->set_thread_name(name_prefix + "BgThread"); - } } PikaClientProcessor::~PikaClientProcessor() { @@ -24,29 +20,15 @@ int PikaClientProcessor::Start() { if (res != net::kSuccess) { return res; } - for (auto& bg_thread : bg_threads_) { - res = bg_thread->StartThread(); - if (res != net::kSuccess) { - return res; - } - } return res; } void PikaClientProcessor::Stop() { pool_->stop_thread_pool(); - for (auto & bg_thread : bg_threads_) { - bg_thread->StopThread(); - } } void PikaClientProcessor::SchedulePool(net::TaskFunc func, void* arg) { pool_->Schedule(func, arg); } -void PikaClientProcessor::ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str) { - std::size_t index = std::hash{}(hash_str) % bg_threads_.size(); - bg_threads_[index]->Schedule(func, arg); -} - size_t PikaClientProcessor::ThreadPoolCurQueueSize() { size_t cur_size = 0; if (pool_) { diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 2b4152e0f1..30b2366632 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -66,6 +66,11 @@ int PikaConf::Load() { GetConfStr("slotmigrate", &smgrt); slotmigrate_.store(smgrt == "yes" ? true : false); + // slow cmd thread pool + std::string slowcmdpool; + GetConfStr("slow-cmd-pool", &slowcmdpool); + slow_cmd_pool_.store(slowcmdpool == "yes" ? true : false); + int binlog_writer_num = 1; GetConfInt("binlog-writer-num", &binlog_writer_num); if (binlog_writer_num <= 0 || binlog_writer_num > 24) { @@ -154,11 +159,11 @@ int PikaConf::Load() { } GetConfInt("slow-cmd-thread-pool-size", &slow_cmd_thread_pool_size_); - if (slow_cmd_thread_pool_size_ <= 0) { - slow_cmd_thread_pool_size_ = 12; + if (slow_cmd_thread_pool_size_ < 0) { + slow_cmd_thread_pool_size_ = 8; } - if (slow_cmd_thread_pool_size_ > 100) { - slow_cmd_thread_pool_size_ = 100; + if (slow_cmd_thread_pool_size_ > 50) { + slow_cmd_thread_pool_size_ = 50; } std::string slow_cmd_list; diff --git a/src/pika_server.cc b/src/pika_server.cc index b5fa4f56d9..c57ba1194f 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -43,6 +43,7 @@ void DoPurgeDir(void* arg) { PikaServer::PikaServer() : exit_(false), + slow_cmd_thread_pool_flag_(false), last_check_compact_time_({0, 0}), last_check_resume_time_({0, 0}), repl_state_(PIKA_REPL_NO_CONNECT), @@ -100,6 +101,7 @@ PikaServer::PikaServer() } acl_ = std::make_unique<::Acl>(); + SetSlowCmdThreadPoolFlag(g_pika_conf->slow_cmd_pool()); } PikaServer::~PikaServer() { @@ -166,12 +168,6 @@ void PikaServer::Start() { LOG(FATAL) << "Start PikaClientProcessor Error: " << ret << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); } - ret = pika_slow_cmd_thread_pool_->start_thread_pool(); - if (ret != net::kSuccess) { - dbs_.clear(); - LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret - << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); - } ret = pika_dispatch_thread_->StartThread(); if (ret != net::kSuccess) { dbs_.clear(); @@ -205,6 +201,21 @@ void PikaServer::Start() { LOG(INFO) << "Goodbye..."; } +void PikaServer::SetSlowCmdThreadPoolFlag(bool flag) { + slow_cmd_thread_pool_flag_ = flag; + int ret = 0; + if (flag) { + ret = pika_slow_cmd_thread_pool_->start_thread_pool(); + if (ret != net::kSuccess) { + dbs_.clear(); + LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret + << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); + } + } else { + pika_slow_cmd_thread_pool_->stop_thread_pool(); + } +} + void PikaServer::Exit() { g_pika_server->DisableCompact(); exit_mutex_.unlock(); @@ -714,10 +725,6 @@ void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_ pika_client_processor_->SchedulePool(func, arg); } -void PikaServer::ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str) { - pika_client_processor_->ScheduleBgThreads(func, arg, hash_str); -} - size_t PikaServer::ClientProcessorThreadPoolCurQueueSize() { if (!pika_client_processor_) { return 0;