-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat:thread purge #2697
feat:thread purge #2697
Changes from 5 commits
a6ae2be
53c81c0
d6cb38d
526affd
13f1d46
a3f779b
6a1c80d
8bac0c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,6 +110,7 @@ class BackendThread : public Thread { | |
*/ | ||
int StartThread() override; | ||
int StopThread() override; | ||
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Properly overriding the - void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
+ // This method is now handled in the base class to avoid redundancy.
|
||
pstd::Status Write(int fd, const std::string& msg); | ||
pstd::Status Close(int fd); | ||
// Try to connect fd noblock, if return EINPROGRESS or EAGAIN or EWOULDBLOCK | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -110,6 +110,7 @@ class ClientThread : public Thread { | |||||
*/ | ||||||
int StartThread() override; | ||||||
int StopThread() override; | ||||||
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensuring that - void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
+ // This method is now handled in the base class to avoid redundancy. Committable suggestion
Suggested change
|
||||||
pstd::Status Write(const std::string& ip, int port, const std::string& msg); | ||||||
pstd::Status Close(const std::string& ip, int port); | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,7 @@ class Thread : public pstd::noncopyable { | |
|
||
std::string thread_name() const { return thread_name_; } | ||
|
||
void set_thread_name(const std::string& name) { thread_name_ = name; } | ||
virtual void set_thread_name(const std::string& name) { thread_name_ = name; } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding synchronization to To ensure thread safety when multiple threads might be setting the thread name simultaneously, consider using a mutex: + std::lock_guard<std::mutex> lock(name_mutex_);
thread_name_ = name; Add a private member
|
||
|
||
protected: | ||
std::atomic_bool should_stop_; | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -48,6 +48,8 @@ int BackendThread::StartThread() { | |||||
if (res) { | ||||||
return res; | ||||||
} | ||||||
set_thread_name("BackendThread"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the server thread, setting the thread name to "BackendThread" is hardcoded. Consider using a dynamic naming strategy here as well to improve monitoring and debugging capabilities. - set_thread_name("BackendThread");
+ set_thread_name("BackendThread-" + std::to_string(port_)); Committable suggestion
Suggested change
|
||||||
|
||||||
return Thread::StartThread(); | ||||||
} | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,7 +66,7 @@ int DispatchThread::StartThread() { | |
// Adding timer tasks and run timertaskThread | ||
timerTaskThread_.AddTimerTask("blrpop_blocking_info_scan", 250, true, | ||
[this] { this->ScanExpiredBlockedConnsOfBlrpop(); }); | ||
|
||
timerTaskThread_.set_thread_name("TimerTaskThread"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Setting the thread name before starting the thread is a good practice. Consider allowing dynamic thread naming for greater flexibility, especially useful in environments with multiple timer tasks. |
||
timerTaskThread_.StartThread(); | ||
return ServerThread::StartThread(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,6 +75,7 @@ int HolyThread::StartThread() { | |
if (ret) { | ||
return ret; | ||
} | ||
set_thread_name("HolyThread"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HolyThread是其他类的父类,你在这里设置。会不会其他类设置的名字被这个替代了。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
已修复 |
||
return ServerThread::StartThread(); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -8,6 +8,7 @@ | |||||||||
|
||||||||||
#include <sys/time.h> | ||||||||||
|
||||||||||
#include <string> | ||||||||||
#include <utility> | ||||||||||
|
||||||||||
namespace net { | ||||||||||
|
@@ -24,7 +25,8 @@ int ThreadPool::Worker::start() { | |||||||||
return -1; | ||||||||||
} else { | ||||||||||
start_.store(true); | ||||||||||
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "Worker"); | ||||||||||
std::string thread_id_str = std::to_string(reinterpret_cast<unsigned long>(thread_id_)); | ||||||||||
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "_Worker_" + thread_id_str); | ||||||||||
Comment on lines
+28
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider improving portability of thread ID conversion. The current method of casting - std::string thread_id_str = std::to_string(reinterpret_cast<unsigned long>(thread_id_));
+ std::string thread_id_str = std::to_string(std::hash<std::thread::id>()(std::thread::id(thread_id_))); This uses the standard library's hash function to convert the thread ID to a string in a more portable manner. Committable suggestion
Suggested change
|
||||||||||
} | ||||||||||
} | ||||||||||
return 0; | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1600,6 +1600,12 @@ void ConfigCmd::ConfigGet(std::string& ret) { | |
EncodeString(&config_body, g_pika_conf->slotmigrate() ? "yes" : "no"); | ||
} | ||
|
||
if (pstd::stringmatch(pattern.data(), "slow-cmd-pool", 1)) { | ||
elements += 2; | ||
EncodeString(&config_body, "slow-cmd-pool"); | ||
EncodeString(&config_body, g_pika_conf->slow_cmd_pool() ? "yes" : "no"); | ||
} | ||
|
||
if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)!= 0) { | ||
elements += 2; | ||
EncodeString(&config_body, "slotmigrate-thread-num"); | ||
|
@@ -2129,6 +2135,7 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) { | |
"requirepass", | ||
"masterauth", | ||
"slotmigrate", | ||
"slow-cmd-pool", | ||
"slotmigrate-thread-num", | ||
"thread-migrate-keys-num", | ||
"userpass", | ||
|
@@ -2288,6 +2295,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) { | |
} | ||
g_pika_conf->SetSlotMigrate(slotmigrate); | ||
res_.AppendStringRaw("+OK\r\n"); | ||
} else if (set_item == "slow_cmd_pool") { | ||
bool SlowCmdPool; | ||
if (value == "yes") { | ||
SlowCmdPool = true; | ||
g_pika_server->SetSlowCmdThreadPoolFlag(SlowCmdPool); | ||
} else if (value == "no") { | ||
SlowCmdPool = false; | ||
} else { | ||
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slow-cmd-pool'\r\n"); | ||
return; | ||
} | ||
g_pika_conf->SetSlowCmdPool(SlowCmdPool); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 你schedule判断是否把请求塞给慢队列的时候,会读取g_pika_conf->slow_cmd_pool,setslowcmdthreadpoolflag中会等待慢队列为空然后才会stop线程池,那么这里就应该先改pika conf,保证不再有新的请求塞到队列里,然后你在等待队列都消费完之后返回。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done |
||
res_.AppendStringRaw("+OK\r\n"); | ||
} else if (set_item == "slowlog-log-slower-than") { | ||
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { | ||
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip
Codebase Verification
Ensure thread safety when toggling
slow_cmd_thread_pool_flag_
. The current implementation lacks mutex locks or atomic operations, which could lead to race conditions.src/pika_server.cc
:PikaServer::SetSlowCmdThreadPoolFlag
Analysis chain
Ensure thread safety when toggling
slow_cmd_thread_pool_flag_
.Ensure thread safety around
slow_cmd_thread_pool_flag_
usage by checking for mutex locks or atomic operations.Scripts executed
The following scripts were executed for the analysis:
Script:
Length of output: 1088
Script:
Length of output: 110