Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:thread purge #2697

Merged
merged 8 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ thread-num : 1
# are dedicated to handling user requests.
thread-pool-size : 12

# This parameter is used to control whether to separate fast and slow commands.
# When slow-cmd-pool is set to yes, fast and slow commands are separated.
# When set to no, they are not separated.
slow-cmd-pool : no

# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1
Expand Down
2 changes: 0 additions & 2 deletions include/pika_client_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ class PikaClientProcessor {
int Start();
void Stop();
void SchedulePool(net::TaskFunc func, void* arg);
void ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);
size_t ThreadPoolCurQueueSize();
size_t ThreadPoolMaxQueueSize();

private:
std::unique_ptr<net::ThreadPool> pool_;
std::vector<std::unique_ptr<net::BGThread>> bg_threads_;
};
#endif // PIKA_CLIENT_PROCESSOR_H_
10 changes: 10 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return slotmigrate_;
}
bool slow_cmd_pool() {
std::shared_lock l(rwlock_);
return slow_cmd_pool_;
}
std::string server_id() {
std::shared_lock l(rwlock_);
return server_id_;
Expand Down Expand Up @@ -574,6 +578,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("slotmigrate", value ? "yes" : "no");
slotmigrate_.store(value);
}
void SetSlowCmdPool(const bool value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slow-cmd-pool", value ? "yes" : "no");
slow_cmd_pool_.store(value);
}
void SetSlotMigrateThreadNum(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slotmigrate-thread-num", std::to_string(value));
Expand Down Expand Up @@ -844,6 +853,7 @@ class PikaConf : public pstd::BaseConf {
std::string bgsave_path_;
std::string bgsave_prefix_;
std::string pidfile_;
std::atomic<bool> slow_cmd_pool_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原子变量不用加锁保护,不要使用原子变量

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原子变量不用加锁保护,不要使用原子变量

done


std::string compression_;
std::string compression_per_level_;
Expand Down
9 changes: 7 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class PikaServer : public pstd::noncopyable {
bool force_full_sync();
void SetForceFullSync(bool v);
void SetDispatchQueueLimit(int queue_limit);
void SetSlowCmdThreadPoolFlag(bool flag);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Ensure thread safety when toggling slow_cmd_thread_pool_flag_. The current implementation lacks mutex locks or atomic operations, which could lead to race conditions.

  • src/pika_server.cc: PikaServer::SetSlowCmdThreadPoolFlag
Analysis chain

Ensure thread safety when toggling slow_cmd_thread_pool_flag_.


Ensure thread safety around slow_cmd_thread_pool_flag_ usage by checking for mutex locks or atomic operations.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Check for potential race conditions related to `slow_cmd_thread_pool_flag_`.

# Test: Search for occurrences of `slow_cmd_thread_pool_flag_` being accessed.
rg --type cpp --context 5 'slow_cmd_thread_pool_flag_' src/

Length of output: 1088



Script:

#!/bin/bash
# Description: Check for thread safety mechanisms around `slow_cmd_thread_pool_flag_` usage.

# Test: Search for mutex locks or atomic operations related to `slow_cmd_thread_pool_flag_`.
rg --type cpp --context 5 'slow_cmd_thread_pool_flag_' src/ | grep -E 'std::mutex|std::lock_guard|std::atomic'

Length of output: 110

storage::StorageOptions storage_options();
std::unique_ptr<PikaDispatchThread>& pika_dispatch_thread() {
return pika_dispatch_thread_;
Expand Down Expand Up @@ -168,7 +169,6 @@ class PikaServer : public pstd::noncopyable {
void FinishMetaSync();
bool MetaSyncDone();
void ResetMetaSyncStatus();
void SetLoopDBStateMachine(bool need_loop);
int GetMetaSyncTimestamp();
void UpdateMetaSyncTimestamp();
void UpdateMetaSyncTimestampWithoutLock();
Expand All @@ -179,7 +179,7 @@ class PikaServer : public pstd::noncopyable {
* PikaClientProcessor Process Task
*/
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd);
void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);

// for info debug
size_t ClientProcessorThreadPoolCurQueueSize();
size_t ClientProcessorThreadPoolMaxQueueSize();
Expand Down Expand Up @@ -642,6 +642,11 @@ class PikaServer : public pstd::noncopyable {
* acl
*/
std::unique_ptr<::Acl> acl_ = nullptr;

/*
* fast and slow thread pools
*/
bool slow_cmd_thread_pool_flag_;
};

#endif
2 changes: 1 addition & 1 deletion src/net/src/net_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Thread::~Thread() = default;
void* Thread::RunThread(void* arg) {
auto thread = reinterpret_cast<Thread*>(arg);
if (!(thread->thread_name().empty())) {
SetThreadName(pthread_self(), thread->thread_name());
SetThreadName(pthread_self(), "pika");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.修改ThreadPoolWorke 命名
2.修改继承thread/bgthread的命名
根据各自功能命名

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.修改ThreadPoolWorke 命名 2.修改继承thread/bgthread的命名 根据各自功能命名

done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里直接改成pika不合适吧?Thread应该是个基类,之前的那种方式看起来更合理点。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里直接改成pika不合适吧?Thread应该是个基类,之前的那种方式看起来更合理点。

这个代码 没有吧 应该已经没有改

}
thread->ThreadMain();
return nullptr;
Expand Down
20 changes: 20 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, g_pika_conf->slotmigrate() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "slow-cmd-pool", 1)) {
elements += 2;
EncodeString(&config_body, "slow-cmd-pool");
EncodeString(&config_body, g_pika_conf->slow_cmd_pool() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)!= 0) {
elements += 2;
EncodeString(&config_body, "slotmigrate-thread-num");
Expand Down Expand Up @@ -2129,6 +2135,7 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"requirepass",
"masterauth",
"slotmigrate",
"slow-cmd-pool",
"slotmigrate-thread-num",
"thread-migrate-keys-num",
"userpass",
Expand Down Expand Up @@ -2288,6 +2295,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetSlotMigrate(slotmigrate);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slow_cmd_pool") {
bool SlowCmdPool;
if (value == "yes") {
SlowCmdPool = true;
g_pika_server->SetSlowCmdThreadPoolFlag(SlowCmdPool);
} else if (value == "no") {
SlowCmdPool = false;
} else {
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slow-cmd-pool'\r\n");
return;
}
g_pika_conf->SetSlowCmdPool(SlowCmdPool);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你schedule判断是否把请求塞给慢队列的时候,会读取g_pika_conf->slow_cmd_pool,setslowcmdthreadpoolflag中会等待慢队列为空然后才会stop线程池,那么这里就应该先改pika conf,保证不再有新的请求塞到队列里,然后你在等待队列都消费完之后返回。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setslowcmdthreadpoolflag

done

res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slowlog-log-slower-than") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n");
Expand Down
18 changes: 0 additions & 18 deletions src/pika_client_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@

PikaClientProcessor::PikaClientProcessor(size_t worker_num, size_t max_queue_size, const std::string& name_prefix) {
pool_ = std::make_unique<net::ThreadPool>(worker_num, max_queue_size, name_prefix + "Pool");
for (size_t i = 0; i < worker_num; ++i) {
bg_threads_.push_back(std::make_unique<net::BGThread>(max_queue_size));
bg_threads_.back()->set_thread_name(name_prefix + "BgThread");
}
}

PikaClientProcessor::~PikaClientProcessor() {
Expand All @@ -24,29 +20,15 @@ int PikaClientProcessor::Start() {
if (res != net::kSuccess) {
return res;
}
for (auto& bg_thread : bg_threads_) {
res = bg_thread->StartThread();
if (res != net::kSuccess) {
return res;
}
}
return res;
}

void PikaClientProcessor::Stop() {
pool_->stop_thread_pool();
for (auto & bg_thread : bg_threads_) {
bg_thread->StopThread();
}
}

void PikaClientProcessor::SchedulePool(net::TaskFunc func, void* arg) { pool_->Schedule(func, arg); }

void PikaClientProcessor::ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str) {
std::size_t index = std::hash<std::string>{}(hash_str) % bg_threads_.size();
bg_threads_[index]->Schedule(func, arg);
}

size_t PikaClientProcessor::ThreadPoolCurQueueSize() {
size_t cur_size = 0;
if (pool_) {
Expand Down
13 changes: 9 additions & 4 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ int PikaConf::Load() {
GetConfStr("slotmigrate", &smgrt);
slotmigrate_.store(smgrt == "yes" ? true : false);

// slow cmd thread pool
std::string slowcmdpool;
GetConfStr("slow-cmd-pool", &slowcmdpool);
slow_cmd_pool_.store(slowcmdpool == "yes" ? true : false);

int binlog_writer_num = 1;
GetConfInt("binlog-writer-num", &binlog_writer_num);
if (binlog_writer_num <= 0 || binlog_writer_num > 24) {
Expand Down Expand Up @@ -154,11 +159,11 @@ int PikaConf::Load() {
}

GetConfInt("slow-cmd-thread-pool-size", &slow_cmd_thread_pool_size_);
if (slow_cmd_thread_pool_size_ <= 0) {
slow_cmd_thread_pool_size_ = 12;
if (slow_cmd_thread_pool_size_ < 0) {
slow_cmd_thread_pool_size_ = 8;
}
if (slow_cmd_thread_pool_size_ > 100) {
slow_cmd_thread_pool_size_ = 100;
if (slow_cmd_thread_pool_size_ > 50) {
slow_cmd_thread_pool_size_ = 50;
}

std::string slow_cmd_list;
Expand Down
29 changes: 18 additions & 11 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ void DoPurgeDir(void* arg) {

PikaServer::PikaServer()
: exit_(false),
slow_cmd_thread_pool_flag_(false),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialization of slow_cmd_thread_pool_flag_ should be based on configuration.

Consider initializing slow_cmd_thread_pool_flag_ directly with the configuration value to avoid potential inconsistencies:

- slow_cmd_thread_pool_flag_(false),
+ slow_cmd_thread_pool_flag_(g_pika_conf->slow_cmd_pool()),
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
slow_cmd_thread_pool_flag_(false),
slow_cmd_thread_pool_flag_(g_pika_conf->slow_cmd_pool()),

last_check_compact_time_({0, 0}),
last_check_resume_time_({0, 0}),
repl_state_(PIKA_REPL_NO_CONNECT),
Expand Down Expand Up @@ -100,6 +101,7 @@ PikaServer::PikaServer()
}

acl_ = std::make_unique<::Acl>();
SetSlowCmdThreadPoolFlag(g_pika_conf->slow_cmd_pool());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The search results indicate that slow_cmd_pool is being set and used in various parts of the code, but there is no explicit validation logic evident from the provided context. The configuration is being set directly from the configuration file without additional validation.

  • src/pika_conf.cc: The configuration value for slow_cmd_pool is read from the configuration file and stored.
  • include/pika_conf.h: The slow_cmd_pool is defined as an atomic boolean and has getter and setter methods.

There is no evidence of validation logic for slow_cmd_pool before its use.

  • src/pika_conf.cc
  • include/pika_conf.h
Analysis chain

Ensure configuration is validated before use.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify that the configuration for `slow_cmd_pool` is validated before use.

# Test: Search for the validation logic. Expect: Validation logic before this usage.
rg --type python --context 5 $'g_pika_conf->slow_cmd_pool()'

Length of output: 92



Script:

#!/bin/bash
# Description: Verify that the configuration for `slow_cmd_pool` is validated before use.

# Test: Search for the validation logic. Expect: Validation logic before this usage.
rg --type cpp --context 5 'g_pika_conf->slow_cmd_pool()'

Length of output: 1637



Script:

#!/bin/bash
# Description: Verify that the configuration for `slow_cmd_pool` is validated before use.

# Test: Search for the initialization and validation logic of `slow_cmd_pool`.
rg --type cpp --context 5 'slow_cmd_pool'

Length of output: 4495

}

PikaServer::~PikaServer() {
Expand Down Expand Up @@ -166,12 +168,6 @@ void PikaServer::Start() {
LOG(FATAL) << "Start PikaClientProcessor Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}
ret = pika_slow_cmd_thread_pool_->start_thread_pool();
if (ret != net::kSuccess) {
dbs_.clear();
LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}
ret = pika_dispatch_thread_->StartThread();
if (ret != net::kSuccess) {
dbs_.clear();
Expand Down Expand Up @@ -205,6 +201,21 @@ void PikaServer::Start() {
LOG(INFO) << "Goodbye...";
}

void PikaServer::SetSlowCmdThreadPoolFlag(bool flag) {
slow_cmd_thread_pool_flag_ = flag;
int ret = 0;
if (flag) {
ret = pika_slow_cmd_thread_pool_->start_thread_pool();
if (ret != net::kSuccess) {
dbs_.clear();
LOG(ERROR) << "Start PikaLowLevelThreadPool Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pika_slow_cmd_thread_pool_->Schedule()的地方建议加个判断,因为现在is_slow_cmd和你开启slowcmdthreadpool是两个配置参数,如果is_slow_cmd判断为true,但是没有开slowcmdthreadpool,就会有问题。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pika_slow_cmd_thread_pool_->Schedule()的地方建议加个判断,因为现在is_slow_cmd和你开启slowcmdthreadpool是两个配置参数,如果is_slow_cmd判断为true,但是没有开slowcmdthreadpool,就会有问题。

done

} else {
pika_slow_cmd_thread_pool_->stop_thread_pool();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop_thread_pool应该是不会等队列里的任务被执行完成再结束,stop_thread_pool的时候队列里还有任务,那这些任务就被丢掉了。进而对应连接就不会被再次唤醒了。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop_thread_pool应该是不会等队列里的任务被执行完成再结束,stop_thread_pool的时候队列里还有任务,那这些任务就被丢掉了。进而对应连接就不会被再次唤醒了。

done

}
}

void PikaServer::Exit() {
g_pika_server->DisableCompact();
exit_mutex_.unlock();
Expand Down Expand Up @@ -707,17 +718,13 @@ void PikaServer::SetFirstMetaSync(bool v) {
}

void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd) {
if (is_slow_cmd) {
if (is_slow_cmd && g_pika_conf->slow_cmd_pool()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add safety checks to ensure pika_slow_cmd_thread_pool_ is initialized before scheduling tasks in ScheduleClientPool.

+ if (!pika_slow_cmd_thread_pool_) {
+   LOG(WARNING) << "Slow command thread pool is not initialized";
+   return;
+ }
  if (is_slow_cmd && g_pika_conf->slow_cmd_pool()) {
    pika_slow_cmd_thread_pool_->Schedule(func, arg);
    return;
  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (is_slow_cmd && g_pika_conf->slow_cmd_pool()) {
if (!pika_slow_cmd_thread_pool_) {
LOG(WARNING) << "Slow command thread pool is not initialized";
return;
}
if (is_slow_cmd && g_pika_conf->slow_cmd_pool()) {
pika_slow_cmd_thread_pool_->Schedule(func, arg);
return;
}

pika_slow_cmd_thread_pool_->Schedule(func, arg);
return;
}
pika_client_processor_->SchedulePool(func, arg);
}

void PikaServer::ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str) {
pika_client_processor_->ScheduleBgThreads(func, arg, hash_str);
}

size_t PikaServer::ClientProcessorThreadPoolCurQueueSize() {
if (!pika_client_processor_) {
return 0;
Expand Down
1 change: 1 addition & 0 deletions src/rsync_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ void RsyncWriteResp(RsyncService::RsyncResponse& response, std::shared_ptr<net::
RsyncServer::RsyncServer(const std::set<std::string>& ips, const int port) {
work_thread_ = std::make_unique<net::ThreadPool>(2, 100000);
rsync_server_thread_ = std::make_unique<RsyncServerThread>(ips, port, 1 * 1000, this);
rsync_server_thread_->set_thread_name("RsyncServer");
Copy link

@coderabbitai coderabbitai bot Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using dynamic thread names for better traceability.

-  rsync_server_thread_->set_thread_name("RsyncServer");
+  rsync_server_thread_->set_thread_name("RsyncServer-" + std::to_string(port));

This change appends the port number to the thread name, making it easier to identify in logs and monitoring tools.

Committable suggestion was skipped due to low confidence.

}

RsyncServer::~RsyncServer() {
Expand Down
Loading