Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:Split the admin command out of the main thread #2727

Merged
merged 5 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ thread-pool-size : 12
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1


# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
admin-thread-pool-size : 2

# Slow cmd list e.g. hgetall, mset
slow-cmd-list :

Expand Down
2 changes: 2 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ enum CmdFlags {
kCmdFlagsStream = (1 << 20),
kCmdFlagsFast = (1 << 21),
kCmdFlagsSlow = (1 << 22),
kCmdFlagsMonitor = (1 << 23),
};

void inline RedisAppendContent(std::string& str, const std::string& value);
Expand Down Expand Up @@ -545,6 +546,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool IsNeedUpdateCache() const;
bool IsNeedReadCache() const;
bool IsNeedCacheDo() const;
bool IsMonitorCmd() const;
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
uint64_t GetDoDuration() const { return do_duration_; };
std::shared_ptr<DB> GetDB() const { return db_; };
Expand Down
10 changes: 10 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return slow_cmd_thread_pool_size_;
}
int admin_thread_pool_size() {
std::shared_lock l(rwlock_);
return admin_thread_pool_size_;
}
int sync_thread_num() {
std::shared_lock l(rwlock_);
return sync_thread_num_;
Expand Down Expand Up @@ -475,6 +479,11 @@ class PikaConf : public pstd::BaseConf {
slow_cmd_thread_pool_size_ = value;
}

void SetAdminThreadPoolSize(const int value) {
std::lock_guard l(rwlock_);
admin_thread_pool_size_ = value;
}

void SetSlaveof(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slaveof", value);
Expand Down Expand Up @@ -795,6 +804,7 @@ class PikaConf : public pstd::BaseConf {
int thread_num_ = 0;
int thread_pool_size_ = 0;
int slow_cmd_thread_pool_size_ = 0;
int admin_thread_pool_size_ = 0;
std::unordered_set<std::string> slow_cmd_set_;
int sync_thread_num_ = 0;
int sync_binlog_thread_num_ = 0;
Expand Down
3 changes: 2 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class PikaServer : public pstd::noncopyable {
/*
* PikaClientProcessor Process Task
*/
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd);
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd);
void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);
// for info debug
size_t ClientProcessorThreadPoolCurQueueSize();
Expand Down Expand Up @@ -553,6 +553,7 @@ class PikaServer : public pstd::noncopyable {
int worker_num_ = 0;
std::unique_ptr<PikaClientProcessor> pika_client_processor_;
std::unique_ptr<net::ThreadPool> pika_slow_cmd_thread_pool_;
std::unique_ptr<net::ThreadPool> pika_admin_cmd_thread_pool_;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Initialization and Cleanup of Admin Command Thread Pool

The pika_admin_cmd_thread_pool_ is initialized and used in src/pika_server.cc. However, there is no evidence of its cleanup in the destructor. Ensure that the thread pool is properly stopped and cleaned up to avoid resource leaks.

  • src/pika_server.cc: Initialization and usage of pika_admin_cmd_thread_pool_ found, but no cleanup in the destructor.
Analysis chain

Initialization and Cleanup of Admin Command Thread Pool

The pika_admin_cmd_thread_pool_ has been added to handle admin commands. Ensure that this thread pool is appropriately initialized in the constructor and properly cleaned up in the destructor to avoid resource leaks.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify initialization and cleanup of pika_admin_cmd_thread_pool_ in PikaServer's constructor and destructor.

# Test: Search for the initialization and cleanup in the constructor and destructor.
rg --type cpp --multiline $'PikaServer::PikaServer\\(\\)\\s*\\{[^\\}]*pika_admin_cmd_thread_pool_' 'include/pika_server.h'
rg --type cpp --multiline $'PikaServer::~PikaServer\\(\\)\\s*\\{[^\\}]*pika_admin_cmd_thread_pool_' 'include/pika_server.h'

Length of output: 234



Script:

#!/bin/bash
# Description: Search for all references to pika_admin_cmd_thread_pool_ in the codebase to verify its management.

# Test: Search for all occurrences of pika_admin_cmd_thread_pool_
rg 'pika_admin_cmd_thread_pool_' --type cpp

Length of output: 482

std::unique_ptr<PikaDispatchThread> pika_dispatch_thread_ = nullptr;

/*
Expand Down
7 changes: 7 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,13 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "slow-cmd-thread-pool-size");
EncodeNumber(&config_body, g_pika_conf->slow_cmd_thread_pool_size());
}

if (pstd::stringmatch(pattern.data(), "admin-thread-pool-size", 1) != 0) {
elements += 2;
EncodeString(&config_body, "admin-thread-pool-size");
EncodeNumber(&config_body, g_pika_conf->admin_thread_pool_size());
}

if (pstd::stringmatch(pattern.data(), "userblacklist", 1) != 0) {
elements += 2;
EncodeString(&config_body, "userblacklist");
Expand Down
4 changes: 3 additions & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
std::string opt = argvs[0][0];
pstd::StringToLower(opt);
bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt);
chejinge marked this conversation as resolved.
Show resolved Hide resolved
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd);
const std::shared_ptr<Cmd>& cmd_ = g_pika_cmd_table_manager->GetCmd(opt);
bool is_admin_cmd = cmd_->IsMonitorCmd();
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd);
return;
}
BatchExecRedisCmd(argvs);
Expand Down
7 changes: 4 additions & 3 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePurgelogsto, std::move(purgelogsto)));

std::unique_ptr<Cmd> pingptr =
std::make_unique<PingCmd>(kCmdNamePing, 1, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsFast);
std::make_unique<PingCmd>(kCmdNamePing, 1, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsFast | kCmdFlagsMonitor);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePing, std::move(pingptr)));

std::unique_ptr<Cmd> helloptr =
Expand Down Expand Up @@ -91,15 +91,15 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameShutdown, std::move(shutdownptr)));

std::unique_ptr<Cmd> infoptr =
std::make_unique<InfoCmd>(kCmdNameInfo, -1, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSlow);
std::make_unique<InfoCmd>(kCmdNameInfo, -1, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSlow | kCmdFlagsMonitor);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameInfo, std::move(infoptr)));

std::unique_ptr<Cmd> configptr =
std::make_unique<ConfigCmd>(kCmdNameConfig, -2, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameConfig, std::move(configptr)));

std::unique_ptr<Cmd> monitorptr =
std::make_unique<MonitorCmd>(kCmdNameMonitor, -1, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSlow);
std::make_unique<MonitorCmd>(kCmdNameMonitor, -1, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSlow | kCmdFlagsMonitor);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameMonitor, std::move(monitorptr)));

std::unique_ptr<Cmd> dbsizeptr =
Expand Down Expand Up @@ -962,6 +962,7 @@ bool Cmd::HasSubCommand() const { return subCmdName_.size() > 0; };
std::vector<std::string> Cmd::SubCommand() const { return subCmdName_; };
bool Cmd::IsAdminRequire() const { return (flag_ & kCmdFlagsAdminRequire); }
bool Cmd::IsNeedUpdateCache() const { return (flag_ & kCmdFlagsUpdateCache); }
bool Cmd::IsMonitorCmd() const { return (flag_ & kCmdFlagsMonitor); }
bool Cmd::IsNeedCacheDo() const {
if (g_pika_conf->IsCacheDisabledTemporarily()) {
return false;
Expand Down
8 changes: 8 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ int PikaConf::Load() {
slow_cmd_thread_pool_size_ = 100;
}

GetConfInt("admin-thread-pool-size", &admin_thread_pool_size_);
if (admin_thread_pool_size_ <= 0) {
admin_thread_pool_size_ = 2;
}
if (admin_thread_pool_size_ > 4) {
admin_thread_pool_size_ = 4;
}
Comment on lines +169 to +175
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validate the bounds for admin-thread-pool-size.

The validation for admin-thread-pool-size ensures it's between 2 and 4. Consider documenting the reason for these specific bounds in the code or configuration documentation to aid future maintainers or users in understanding these constraints.


Update related documentation and example configurations.

Please ensure that the addition of the admin-thread-pool-size configuration parameter is reflected in the pika.conf example file and any relevant documentation. This helps users understand how to configure this new feature.


std::string slow_cmd_list;
GetConfStr("slow-cmd-list", &slow_cmd_list);
SetSlowCmd(slow_cmd_list);
Expand Down
3 changes: 2 additions & 1 deletion src/pika_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ void BlockingBaseCmd::TryToServeBLrPopWithThisKey(const std::string& key, std::s

auto* args = new UnblockTaskArgs(key, std::move(db), dispatchThread);
bool is_slow_cmd = g_pika_conf->is_slow_cmd("LPOP") || g_pika_conf->is_slow_cmd("RPOP");
g_pika_server->ScheduleClientPool(&ServeAndUnblockConns, args, is_slow_cmd);
bool is_monitor_cmd = false;
g_pika_server->ScheduleClientPool(&ServeAndUnblockConns, args, is_slow_cmd, is_monitor_cmd);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The variable is_monitor_cmd is conditionally set based on command type and transaction state in src/pika_client_conn.cc. It is recommended to implement similar logic in src/pika_list.cc to ensure is_monitor_cmd is accurately determined.

  • src/pika_list.cc, lines 174-175
Analysis chain

Verify the handling of is_monitor_cmd in BlockingBaseCmd::TryToServeBLrPopWithThisKey.

The variable is_monitor_cmd is hardcoded to false. Verify if this should be dynamically determined based on the command type or other conditions to accurately reflect the new admin command handling logic.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify if there are conditions or configurations that should set `is_monitor_cmd` to true.
# Test: Search for conditions that affect `is_monitor_cmd` in the codebase.
rg --type cpp $'is_monitor_cmd = .*;'

Length of output: 306



Script:

#!/bin/bash
# Extract and analyze the relevant code segments from src/pika_client_conn.cc where is_monitor_cmd is set.
rg --type cpp --context 5 $'is_monitor_cmd =' src/pika_client_conn.cc

Length of output: 821

}

void BlockingBaseCmd::ServeAndUnblockConns(void* args) {
Expand Down
13 changes: 12 additions & 1 deletion src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ PikaServer::~PikaServer() {
// so we need to delete dispatch before worker.
pika_client_processor_->Stop();
pika_slow_cmd_thread_pool_->stop_thread_pool();
pika_admin_cmd_thread_pool_->stop_thread_pool();
{
std::lock_guard l(slave_mutex_);
auto iter = slaves_.begin();
Expand Down Expand Up @@ -172,6 +173,12 @@ void PikaServer::Start() {
LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}
ret = pika_admin_cmd_thread_pool_->start_thread_pool();
if (ret != net::kSuccess) {
dbs_.clear();
LOG(FATAL) << "Start PikaAdminThreadPool Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}
ret = pika_dispatch_thread_->StartThread();
if (ret != net::kSuccess) {
dbs_.clear();
Expand Down Expand Up @@ -706,11 +713,15 @@ void PikaServer::SetFirstMetaSync(bool v) {
first_meta_sync_ = v;
}

void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd) {
void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_monitor_cmd) {
if (is_slow_cmd) {
pika_slow_cmd_thread_pool_->Schedule(func, arg);
return;
}
if (is_monitor_cmd) {
pika_admin_cmd_thread_pool_->Schedule(func, arg);
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to ScheduleClientPool method to handle different command types are crucial for proper command routing. Consider adding unit tests to validate correct command identification and routing.

pika_client_processor_->SchedulePool(func, arg);
}

Expand Down
Loading