-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat:thread purge #2697
feat:thread purge #2697
Changes from all commits
a6ae2be
53c81c0
d6cb38d
526affd
13f1d46
a3f779b
6a1c80d
8bac0c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -97,6 +97,7 @@ class PikaServer : public pstd::noncopyable { | |
bool force_full_sync(); | ||
void SetForceFullSync(bool v); | ||
void SetDispatchQueueLimit(int queue_limit); | ||
void SetSlowCmdThreadPoolFlag(bool flag); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tip Codebase Verification Ensure thread safety when toggling
Analysis chainEnsure thread safety when toggling Ensure thread safety around Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Check for potential race conditions related to `slow_cmd_thread_pool_flag_`.
# Test: Search for occurrences of `slow_cmd_thread_pool_flag_` being accessed.
rg --type cpp --context 5 'slow_cmd_thread_pool_flag_' src/
Length of output: 1088 Script: #!/bin/bash
# Description: Check for thread safety mechanisms around `slow_cmd_thread_pool_flag_` usage.
# Test: Search for mutex locks or atomic operations related to `slow_cmd_thread_pool_flag_`.
rg --type cpp --context 5 'slow_cmd_thread_pool_flag_' src/ | grep -E 'std::mutex|std::lock_guard|std::atomic'
Length of output: 110 |
||
storage::StorageOptions storage_options(); | ||
std::unique_ptr<PikaDispatchThread>& pika_dispatch_thread() { | ||
return pika_dispatch_thread_; | ||
|
@@ -170,7 +171,6 @@ class PikaServer : public pstd::noncopyable { | |
void FinishMetaSync(); | ||
bool MetaSyncDone(); | ||
void ResetMetaSyncStatus(); | ||
void SetLoopDBStateMachine(bool need_loop); | ||
int GetMetaSyncTimestamp(); | ||
void UpdateMetaSyncTimestamp(); | ||
void UpdateMetaSyncTimestampWithoutLock(); | ||
|
@@ -181,7 +181,7 @@ class PikaServer : public pstd::noncopyable { | |
* PikaClientProcessor Process Task | ||
*/ | ||
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd); | ||
void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str); | ||
|
||
// for info debug | ||
size_t ClientProcessorThreadPoolCurQueueSize(); | ||
size_t ClientProcessorThreadPoolMaxQueueSize(); | ||
|
@@ -644,6 +644,11 @@ class PikaServer : public pstd::noncopyable { | |
* acl | ||
*/ | ||
std::unique_ptr<::Acl> acl_ = nullptr; | ||
|
||
/* | ||
* fast and slow thread pools | ||
*/ | ||
bool slow_cmd_thread_pool_flag_; | ||
}; | ||
|
||
#endif |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,6 +110,7 @@ class BackendThread : public Thread { | |
*/ | ||
int StartThread() override; | ||
int StopThread() override; | ||
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Properly overriding the - void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
+ // This method is now handled in the base class to avoid redundancy.
|
||
pstd::Status Write(int fd, const std::string& msg); | ||
pstd::Status Close(int fd); | ||
// Try to connect fd noblock, if return EINPROGRESS or EAGAIN or EWOULDBLOCK | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -110,6 +110,7 @@ class ClientThread : public Thread { | |||||
*/ | ||||||
int StartThread() override; | ||||||
int StopThread() override; | ||||||
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensuring that - void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
+ // This method is now handled in the base class to avoid redundancy. Committable suggestion
Suggested change
|
||||||
pstd::Status Write(const std::string& ip, int port, const std::string& msg); | ||||||
pstd::Status Close(const std::string& ip, int port); | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,7 @@ class Thread : public pstd::noncopyable { | |
|
||
std::string thread_name() const { return thread_name_; } | ||
|
||
void set_thread_name(const std::string& name) { thread_name_ = name; } | ||
virtual void set_thread_name(const std::string& name) { thread_name_ = name; } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding synchronization to To ensure thread safety when multiple threads might be setting the thread name simultaneously, consider using a mutex: + std::lock_guard<std::mutex> lock(name_mutex_);
thread_name_ = name; Add a private member
|
||
|
||
protected: | ||
std::atomic_bool should_stop_; | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -48,6 +48,8 @@ int BackendThread::StartThread() { | |||||
if (res) { | ||||||
return res; | ||||||
} | ||||||
set_thread_name("BackendThread"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the server thread, setting the thread name to "BackendThread" is hardcoded. Consider using a dynamic naming strategy here as well to improve monitoring and debugging capabilities. - set_thread_name("BackendThread");
+ set_thread_name("BackendThread-" + std::to_string(port_)); Committable suggestion
Suggested change
|
||||||
|
||||||
return Thread::StartThread(); | ||||||
} | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,7 +66,7 @@ int DispatchThread::StartThread() { | |
// Adding timer tasks and run timertaskThread | ||
timerTaskThread_.AddTimerTask("blrpop_blocking_info_scan", 250, true, | ||
[this] { this->ScanExpiredBlockedConnsOfBlrpop(); }); | ||
|
||
timerTaskThread_.set_thread_name("TimerTaskThread"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Setting the thread name before starting the thread is a good practice. Consider allowing dynamic thread naming for greater flexibility, especially useful in environments with multiple timer tasks. |
||
timerTaskThread_.StartThread(); | ||
return ServerThread::StartThread(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -8,6 +8,7 @@ | |||||||||
|
||||||||||
#include <sys/time.h> | ||||||||||
|
||||||||||
#include <string> | ||||||||||
#include <utility> | ||||||||||
|
||||||||||
namespace net { | ||||||||||
|
@@ -24,7 +25,8 @@ int ThreadPool::Worker::start() { | |||||||||
return -1; | ||||||||||
} else { | ||||||||||
start_.store(true); | ||||||||||
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "Worker"); | ||||||||||
std::string thread_id_str = std::to_string(reinterpret_cast<unsigned long>(thread_id_)); | ||||||||||
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "_Worker_" + thread_id_str); | ||||||||||
Comment on lines
+28
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider improving portability of thread ID conversion. The current method of casting - std::string thread_id_str = std::to_string(reinterpret_cast<unsigned long>(thread_id_));
+ std::string thread_id_str = std::to_string(std::hash<std::thread::id>()(std::thread::id(thread_id_))); This uses the standard library's hash function to convert the thread ID to a string in a more portable manner. Committable suggestion
Suggested change
|
||||||||||
} | ||||||||||
} | ||||||||||
return 0; | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1602,6 +1602,12 @@ void ConfigCmd::ConfigGet(std::string& ret) { | |
EncodeString(&config_body, g_pika_conf->slotmigrate() ? "yes" : "no"); | ||
} | ||
|
||
if (pstd::stringmatch(pattern.data(), "slow-cmd-pool", 1)) { | ||
elements += 2; | ||
EncodeString(&config_body, "slow-cmd-pool"); | ||
EncodeString(&config_body, g_pika_conf->slow_cmd_pool() ? "yes" : "no"); | ||
} | ||
|
||
if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)!= 0) { | ||
elements += 2; | ||
EncodeString(&config_body, "slotmigrate-thread-num"); | ||
|
@@ -2143,6 +2149,7 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) { | |
"requirepass", | ||
"masterauth", | ||
"slotmigrate", | ||
"slow-cmd-pool", | ||
"slotmigrate-thread-num", | ||
"thread-migrate-keys-num", | ||
"userpass", | ||
|
@@ -2302,6 +2309,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) { | |
} | ||
g_pika_conf->SetSlotMigrate(slotmigrate); | ||
res_.AppendStringRaw("+OK\r\n"); | ||
} else if (set_item == "slow_cmd_pool") { | ||
bool SlowCmdPool; | ||
if (value == "yes") { | ||
SlowCmdPool = true; | ||
} else if (value == "no") { | ||
SlowCmdPool = false; | ||
} else { | ||
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slow-cmd-pool'\r\n"); | ||
return; | ||
} | ||
g_pika_conf->SetSlowCmdPool(SlowCmdPool); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 你schedule判断是否把请求塞给慢队列的时候,会读取g_pika_conf->slow_cmd_pool,setslowcmdthreadpoolflag中会等待慢队列为空然后才会stop线程池,那么这里就应该先改pika conf,保证不再有新的请求塞到队列里,然后你在等待队列都消费完之后返回。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done |
||
g_pika_server->SetSlowCmdThreadPoolFlag(SlowCmdPool); | ||
res_.AppendStringRaw("+OK\r\n"); | ||
} else if (set_item == "slowlog-log-slower-than") { | ||
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { | ||
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
原子变量不用加锁保护,不要使用原子变量
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done