Skip to content

Commit

Permalink
feat:thread purge (OpenAtomFoundation#2697)
Browse files Browse the repository at this point in the history
* feat:thread purge

---------

Co-authored-by: chejinge <[email protected]>
  • Loading branch information
chejinge and brother-jin committed Aug 1, 2024
1 parent 9ad0f16 commit d0d5f2e
Show file tree
Hide file tree
Showing 25 changed files with 115 additions and 48 deletions.
5 changes: 5 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ thread-num : 1
# are dedicated to handling user requests.
thread-pool-size : 12

# This parameter is used to control whether to separate fast and slow commands.
# When slow-cmd-pool is set to yes, fast and slow commands are separated.
# When set to no, they are not separated.
slow-cmd-pool : no

# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1
Expand Down
2 changes: 0 additions & 2 deletions include/pika_client_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ class PikaClientProcessor {
int Start();
void Stop();
void SchedulePool(net::TaskFunc func, void* arg);
void ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);
size_t ThreadPoolCurQueueSize();
size_t ThreadPoolMaxQueueSize();

private:
std::unique_ptr<net::ThreadPool> pool_;
std::vector<std::unique_ptr<net::BGThread>> bg_threads_;
};
#endif // PIKA_CLIENT_PROCESSOR_H_
10 changes: 10 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return slotmigrate_;
}
bool slow_cmd_pool() {
std::shared_lock l(rwlock_);
return slow_cmd_pool_;
}
std::string server_id() {
std::shared_lock l(rwlock_);
return server_id_;
Expand Down Expand Up @@ -538,6 +542,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("slotmigrate", value ? "yes" : "no");
slotmigrate_.store(value);
}
void SetSlowCmdPool(const bool value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slow-cmd-pool", value ? "yes" : "no");
slow_cmd_pool_.store(value);
}
void SetSlotMigrateThreadNum(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slotmigrate-thread-num", std::to_string(value));
Expand Down Expand Up @@ -786,6 +795,7 @@ class PikaConf : public pstd::BaseConf {
std::string bgsave_path_;
std::string bgsave_prefix_;
std::string pidfile_;
std::atomic<bool> slow_cmd_pool_;

std::string compression_;
std::string compression_per_level_;
Expand Down
3 changes: 3 additions & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class PikaReplBgWorker {
void QueueClear();
static void HandleBGWorkerWriteBinlog(void* arg);
static void HandleBGWorkerWriteDB(void* arg);
void SetThreadName(const std::string& thread_name) {
bg_thread_.set_thread_name(thread_name);
}

BinlogItem binlog_item_;
net::RedisParser redis_parser_;
Expand Down
9 changes: 7 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class PikaServer : public pstd::noncopyable {
bool force_full_sync();
void SetForceFullSync(bool v);
void SetDispatchQueueLimit(int queue_limit);
void SetSlowCmdThreadPoolFlag(bool flag);
storage::StorageOptions storage_options();

/*
Expand Down Expand Up @@ -174,7 +175,6 @@ class PikaServer : public pstd::noncopyable {
void FinishMetaSync();
bool MetaSyncDone();
void ResetMetaSyncStatus();
void SetLoopDBStateMachine(bool need_loop);
int GetMetaSyncTimestamp();
void UpdateMetaSyncTimestamp();
void UpdateMetaSyncTimestampWithoutLock();
Expand All @@ -185,7 +185,7 @@ class PikaServer : public pstd::noncopyable {
* PikaClientProcessor Process Task
*/
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd);
void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);

// for info debug
size_t ClientProcessorThreadPoolCurQueueSize();
size_t ClientProcessorThreadPoolMaxQueueSize();
Expand Down Expand Up @@ -648,6 +648,11 @@ class PikaServer : public pstd::noncopyable {
* acl
*/
std::unique_ptr<::Acl> acl_ = nullptr;

/*
* fast and slow thread pools
*/
bool slow_cmd_thread_pool_flag_;
};

#endif
1 change: 1 addition & 0 deletions src/net/include/backend_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class BackendThread : public Thread {
*/
int StartThread() override;
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
pstd::Status Write(int fd, const std::string& msg);
pstd::Status Close(int fd);
// Try to connect fd noblock, if return EINPROGRESS or EAGAIN or EWOULDBLOCK
Expand Down
1 change: 1 addition & 0 deletions src/net/include/client_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class ClientThread : public Thread {
*/
int StartThread() override;
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
pstd::Status Write(const std::string& ip, int port, const std::string& msg);
pstd::Status Close(const std::string& ip, int port);

Expand Down
2 changes: 1 addition & 1 deletion src/net/include/net_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Thread : public pstd::noncopyable {

std::string thread_name() const { return thread_name_; }

void set_thread_name(const std::string& name) { thread_name_ = name; }
virtual void set_thread_name(const std::string& name) { thread_name_ = name; }

protected:
std::atomic_bool should_stop_;
Expand Down
2 changes: 2 additions & 0 deletions src/net/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ class ServerThread : public Thread {
// Move into server thread
virtual void MoveConnIn(std::shared_ptr<NetConn> conn, const NotifyType& type) = 0;

void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }

virtual void KillAllConns() = 0;
virtual bool KillConn(const std::string& ip_port) = 0;

Expand Down
2 changes: 2 additions & 0 deletions src/net/src/backend_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ int BackendThread::StartThread() {
if (res) {
return res;
}
set_thread_name("BackendThread");

return Thread::StartThread();
}

Expand Down
2 changes: 2 additions & 0 deletions src/net/src/client_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ int ClientThread::StartThread() {
if (res) {
return res;
}
set_thread_name("ClientThread");

return Thread::StartThread();
}

Expand Down
2 changes: 1 addition & 1 deletion src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ int DispatchThread::StartThread() {
// Adding timer tasks and run timertaskThread
timerTaskThread_.AddTimerTask("blrpop_blocking_info_scan", 250, true,
[this] { this->ScanExpiredBlockedConnsOfBlrpop(); });

timerTaskThread_.set_thread_name("TimerTaskThread");
timerTaskThread_.StartThread();
return ServerThread::StartThread();
}
Expand Down
2 changes: 2 additions & 0 deletions src/net/src/holy_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class HolyThread : public ServerThread {

int StopThread() override;

void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }

void set_keepalive_timeout(int timeout) override { keepalive_timeout_ = timeout; }

int conn_num() const override;
Expand Down
2 changes: 1 addition & 1 deletion src/net/src/net_thread_name.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ inline bool SetThreadName(pthread_t id, const std::string& name) {
#else
inline bool SetThreadName(pthread_t id, const std::string& name) {
// printf ("no pthread_setname\n");
return false;
return pthread_setname_np(name.c_str()) == 0;
}
#endif
} // namespace net
Expand Down
1 change: 1 addition & 0 deletions src/net/src/net_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ int TimerTaskThread::StartThread() {
// if there is no timer task registered, no need of start the thread
return -1;
}
set_thread_name("TimerTask");
LOG(INFO) << "TimerTaskThread Starting...";
return Thread::StartThread();
}
Expand Down
1 change: 1 addition & 0 deletions src/net/src/net_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class TimerTaskThread : public Thread {
~TimerTaskThread() override;
int StartThread() override;
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }

uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task){
return timer_task_manager_.AddTimerTask(task_name, interval_ms, repeat_exec, task);
Expand Down
4 changes: 3 additions & 1 deletion src/net/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <sys/time.h>

#include <string>
#include <utility>

namespace net {
Expand All @@ -24,7 +25,8 @@ int ThreadPool::Worker::start() {
return -1;
} else {
start_.store(true);
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "Worker");
std::string thread_id_str = std::to_string(reinterpret_cast<unsigned long>(thread_id_));
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "_Worker_" + thread_id_str);
}
}
return 0;
Expand Down
20 changes: 20 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, g_pika_conf->slotmigrate() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "slow-cmd-pool", 1)) {
elements += 2;
EncodeString(&config_body, "slow-cmd-pool");
EncodeString(&config_body, g_pika_conf->slow_cmd_pool() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)!= 0) {
elements += 2;
EncodeString(&config_body, "slotmigrate-thread-num");
Expand Down Expand Up @@ -2145,6 +2151,7 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"requirepass",
"masterauth",
"slotmigrate",
"slow-cmd-pool",
"slotmigrate-thread-num",
"thread-migrate-keys-num",
"userpass",
Expand Down Expand Up @@ -2299,6 +2306,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetSlotMigrate(slotmigrate);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slow_cmd_pool") {
bool SlowCmdPool;
if (value == "yes") {
SlowCmdPool = true;
} else if (value == "no") {
SlowCmdPool = false;
} else {
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slow-cmd-pool'\r\n");
return;
}
g_pika_conf->SetSlowCmdPool(SlowCmdPool);
g_pika_server->SetSlowCmdThreadPoolFlag(SlowCmdPool);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slowlog-log-slower-than") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n");
Expand Down
18 changes: 0 additions & 18 deletions src/pika_client_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@

PikaClientProcessor::PikaClientProcessor(size_t worker_num, size_t max_queue_size, const std::string& name_prefix) {
pool_ = std::make_unique<net::ThreadPool>(worker_num, max_queue_size, name_prefix + "Pool");
for (size_t i = 0; i < worker_num; ++i) {
bg_threads_.push_back(std::make_unique<net::BGThread>(max_queue_size));
bg_threads_.back()->set_thread_name(name_prefix + "BgThread");
}
}

PikaClientProcessor::~PikaClientProcessor() {
Expand All @@ -24,29 +20,15 @@ int PikaClientProcessor::Start() {
if (res != net::kSuccess) {
return res;
}
for (auto& bg_thread : bg_threads_) {
res = bg_thread->StartThread();
if (res != net::kSuccess) {
return res;
}
}
return res;
}

void PikaClientProcessor::Stop() {
pool_->stop_thread_pool();
for (auto & bg_thread : bg_threads_) {
bg_thread->StopThread();
}
}

void PikaClientProcessor::SchedulePool(net::TaskFunc func, void* arg) { pool_->Schedule(func, arg); }

void PikaClientProcessor::ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str) {
std::size_t index = std::hash<std::string>{}(hash_str) % bg_threads_.size();
bg_threads_[index]->Schedule(func, arg);
}

size_t PikaClientProcessor::ThreadPoolCurQueueSize() {
size_t cur_size = 0;
if (pool_) {
Expand Down
13 changes: 9 additions & 4 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ int PikaConf::Load() {
GetConfStr("slotmigrate", &smgrt);
slotmigrate_.store(smgrt == "yes" ? true : false);

// slow cmd thread pool
std::string slowcmdpool;
GetConfStr("slow-cmd-pool", &slowcmdpool);
slow_cmd_pool_.store(slowcmdpool == "yes" ? true : false);

int binlog_writer_num = 1;
GetConfInt("binlog-writer-num", &binlog_writer_num);
if (binlog_writer_num <= 0 || binlog_writer_num > 24) {
Expand Down Expand Up @@ -150,11 +155,11 @@ int PikaConf::Load() {
}

GetConfInt("slow-cmd-thread-pool-size", &slow_cmd_thread_pool_size_);
if (slow_cmd_thread_pool_size_ <= 0) {
slow_cmd_thread_pool_size_ = 12;
if (slow_cmd_thread_pool_size_ < 0) {
slow_cmd_thread_pool_size_ = 8;
}
if (slow_cmd_thread_pool_size_ > 100) {
slow_cmd_thread_pool_size_ = 100;
if (slow_cmd_thread_pool_size_ > 50) {
slow_cmd_thread_pool_size_ = 50;
}

std::string slow_cmd_list;
Expand Down
10 changes: 8 additions & 2 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
client_thread_ = std::make_unique<PikaReplClientThread>(cron_interval, keepalive_timeout);
client_thread_->set_thread_name("PikaReplClient");
for (int i = 0; i < g_pika_conf->sync_binlog_thread_num(); i++) {
write_binlog_workers_.emplace_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
auto new_binlog_worker = std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE);
std::string binlog_worker_name = "ReplBinlogWorker" + std::to_string(i);
new_binlog_worker->SetThreadName(binlog_worker_name);
write_binlog_workers_.emplace_back(std::move(new_binlog_worker));
}
for (int i = 0; i < g_pika_conf->sync_thread_num(); ++i) {
write_db_workers_.emplace_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
auto new_db_worker = std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE);
std::string db_worker_name = "ReplWriteDBWorker" + std::to_string(i);
new_db_worker->SetThreadName(db_worker_name);
write_db_workers_.emplace_back(std::move(new_db_worker));
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/pika_repl_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ extern PikaServer* g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;

PikaReplServer::PikaReplServer(const std::set<std::string>& ips, int port, int cron_interval) {
server_tp_ = std::make_unique<net::ThreadPool>(PIKA_REPL_SERVER_TP_SIZE, 100000);
server_tp_ = std::make_unique<net::ThreadPool>(PIKA_REPL_SERVER_TP_SIZE, 100000, "PikaReplServer");
pika_repl_server_thread_ = std::make_unique<PikaReplServerThread>(ips, port, cron_interval);
pika_repl_server_thread_->set_thread_name("PikaReplServer");
}
Expand All @@ -27,6 +27,7 @@ PikaReplServer::~PikaReplServer() {
}

int PikaReplServer::Start() {
pika_repl_server_thread_->set_thread_name("PikaReplServer");
int res = pika_repl_server_thread_->StartThread();
if (res != net::kSuccess) {
LOG(FATAL) << "Start Pika Repl Server Thread Error: " << res
Expand Down
Loading

0 comments on commit d0d5f2e

Please sign in to comment.